概要
PySparkで下記のようにソートしてからファイルに保存しようと思った場合。
df.repartition(5).orderBy(df.colA).write ...
この記述は出力されるファイル数が5になることを期待しているが、orderByの際にpartition数が変動してしまう為期待した結果にはならない。
こうした場合、rddのsortByを使うと狙いに近い動作が可能になる。
バージョン情報
- Spark 2.4.3
サンプルデータ
0〜29までの数値を持つ30行のDataFrameを生成する。
import pandas as pd import numpy as np # 0〜100までの値がランダムに入った10行のデータを用意 pandas_df = pd.DataFrame( np.arange(30), columns=['A']) df = spark.createDataFrame(pandas_df) df.take(3) #=> [Row(A=0), Row(A=1), Row(A=2)]
これを3つのpartitionにわけてcsvでwriteした際に、ファイル1は0〜9, ファイル2は10〜19のようにソートされた状態で分割するのが今回の目的。
失敗例
下記のようにrepartition(3)してからorderByしてもpartition数は3にならない。
df.repartition(3) \ .orderBy('A') \ .write.mode('overwrite').csv("example.csv")
ファイル数を確認してみる。
$ hadoop fs -ls example.csv | grep part- | wc -l 30
見事に1ファイルずつに分割されてしまった(このあたりの挙動はSparkの設定によって異なります)。
RDD.sortByを利用する
RDDのsortByを用いると数は揃う。
df.repartition(3).rdd.sortBy(lambda r: r.A).toDF() \ .write.mode('overwrite').csv("example.csv")
ファイル数を確認してみる。
$ hadoop fs -ls example.csv | grep part- | wc -l 3
1つ目のファイルを確認する。
$ hadoop fs -text example.csv/part-00000-* 0 1 2 3 4 5 6 7 8 9 10
ちゃんとソートされた状態でデータが入っている。
RDD.sortByにはパーティション数を指定するnumPartitionsという引数が指定できるので、事前処理でpartition数が定まっていなければ下記のように記述した方が良さそう。
df.rdd.sortBy(lambda r: r.A, numPartitions=3).toDF() \ .write.mode('overwrite').csv("example.csv")
descでソート
descでソートしたい場合はascending=Falseを引数に渡す。
df.rdd.sortBy( lambda r: r.A, ascending=False, numPartitions=3).toDF() \ .write.mode('overwrite').csv("example.csv")
中身確認
$ hadoop fs -text example.csv/part-00000* 29 28 27 26 25 24 23 22 21
改定履歴
Author: Masato Watanabe, Date: 2019-09-21, 記事投稿