iMind Developers Blog

iMind開発者ブログ

PySparkでpartition数を指定してソートする

概要

PySparkで下記のようにソートしてからファイルに保存しようと思った場合。

df.repartition(5).orderBy(df.colA).write ...

この記述は出力されるファイル数が5になることを期待しているが、orderByの際にpartition数が変動してしまう為期待した結果にはならない。

こうした場合、rddのsortByを使うと狙いに近い動作が可能になる。

バージョン情報

  • Spark 2.4.3

サンプルデータ

0〜29までの数値を持つ30行のDataFrameを生成する。

import pandas as pd
import numpy as np

# 0〜100までの値がランダムに入った10行のデータを用意
pandas_df = pd.DataFrame(
    np.arange(30),
    columns=['A'])

df = spark.createDataFrame(pandas_df)
df.take(3)
    #=> [Row(A=0), Row(A=1), Row(A=2)] 

これを3つのpartitionにわけてcsvでwriteした際に、ファイル1は0〜9, ファイル2は10〜19のようにソートされた状態で分割するのが今回の目的。

失敗例

下記のようにrepartition(3)してからorderByしてもpartition数は3にならない。

df.repartition(3) \ 
    .orderBy('A') \
    .write.mode('overwrite').csv("example.csv")

ファイル数を確認してみる。

$ hadoop fs -ls example.csv | grep part- | wc -l

30

見事に1ファイルずつに分割されてしまった(このあたりの挙動はSparkの設定によって異なります)。

RDD.sortByを利用する

RDDのsortByを用いると数は揃う。

df.repartition(3).rdd.sortBy(lambda r: r.A).toDF() \
    .write.mode('overwrite').csv("example.csv")

ファイル数を確認してみる。

$ hadoop fs -ls example.csv | grep part- | wc -l

3

1つ目のファイルを確認する。

$ hadoop fs -text example.csv/part-00000-*

0
1
2
3
4
5
6
7
8
9
10

ちゃんとソートされた状態でデータが入っている。

RDD.sortByにはパーティション数を指定するnumPartitionsという引数が指定できるので、事前処理でpartition数が定まっていなければ下記のように記述した方が良さそう。

df.rdd.sortBy(lambda r: r.A, numPartitions=3).toDF() \
    .write.mode('overwrite').csv("example.csv")

descでソート

descでソートしたい場合はascending=Falseを引数に渡す。

df.rdd.sortBy(
    lambda r: r.A, ascending=False, numPartitions=3).toDF() \
    .write.mode('overwrite').csv("example.csv")

中身確認

$ hadoop fs -text example.csv/part-00000*

29
28
27
26
25
24
23
22
21

改定履歴

Author: Masato Watanabe, Date: 2019-09-21, 記事投稿