概要
Spark SQLでSELECTした値をINSERT OVERWRITEしようとした際にファイルが細かくsplitされてHDFSの容量をムダに喰ってしまうことがある。
Spark SQLではrepartitionヒント文を使うことで分割するファイル数を指定することができる。
バージョン情報
- Spark 2.4.3
ヒント文
Spark SQL2.4からREPARTITIONとCOALESCEヒント文が追加されている。
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-hint-framework.html
ヒント文にはその他にもMAPJOIN、BROADCASTJOIN、BROADCASTが用意されている。
サンプルデータの用意
今回の実演の為のサンプルデータを用意する。
import pandas as pd import numpy as np # 1万行くらいのデータを用意 pandas_df = pd.DataFrame( np.random.random([10000, 3]), columns=['A', 'B', 'C']) # SparkのDataFrameに変換 spark_df = spark.createDataFrame(pandas_df) # INSERTする際にファイルが分割されるようにrepartition(10)しておく。 spark_df = spark_df.repartition(10) # tmp_example_tableという名前でテーブルに入れておく spark_df.registerTempTable('tmp_example_table')
これをINSERTするテーブルもhiveに用意しておく。
CREATE TABLE tmp.example_table ( a float, b float, c float )
REPARTITION
まずはREPARTITIONせずにINSERTしてみる。
spark.sql(""" INSERT OVERWRITE TABLE tmp.example_table SELECT a, b, c FROM tmp_example_table """)
これを実行してテーブルのLOCATIONを確認するとファイルが10個できている。
続いてヒント文。repartitionして2個にしてみる。
spark.sql(""" INSERT OVERWRITE TABLE tmp.example_table SELECT /*+ REPARTITION(2) */ a, b, c FROM tmp_example_table """)
これでファイル数は2個になる。
COALESCEの場合は下記のようになる。
spark.sql(""" INSERT OVERWRITE TABLE tmp.example_table SELECT /*+ COALESCE(2) */ a, b, c FROM tmp_example_table """)
REPARTITIONとCOALESCEの違い
REPARTITIONはフルシャッフルしてから指定したパーティション数にファイルを分割する。
COALESCEは既存のパーティションを結合して指定ファイル数にまとめる。
REPARTITIONはデータが等分に分割されるのがメリット。COALESCEはシャッフルしないので実行が軽いのがメリット。
具体的に差が出る例として、REPARTITION(3)とCOALESCE(3)の際でファイルサイズの違いを見てみる。
まずはREPARTITION(3)した場合。シャッフルするので各ファイルサイズは下記のようにほぼ等分になる。
file_name file_size ファイル1 78375 ファイル2 78236 ファイル3 78368
続いてCOALESCE(3)。元データが10個のファイルでそれを結合して3個にするので、既存のpartitionを3, 3, 4で集めてくることになる。
file_name file_size ファイル1 70522 ファイル2 70569 ファイル3 93164
ファイル3だけサイズが大きい分割になっている。
またCOALESCEはファイルを結合するのみで分割はしない為、現状のpartition数が10であるのに対して例えばCOALESCE(15)してもファイル数は増えない。
改定履歴
Author: Masato Watanabe, Date: 2019-07-05, 記事投稿