iMind Developers Blog

iMind開発者ブログ

PySparkのDataFrameをPagination

概要

大きいDataFrameをページで区切りながらtoPandas()して保存してみる。

バージョン情報

  • Spark 2.4.3

サンプルデータの用意

1〜100万までのIDとランダムなスコアを持つDataFrameを用意する。

import pandas as pd
import numpy as np

np.random.seed(0)

# 100万行のデータを用意(1つ目)
pandas_df = pd.DataFrame(
    np.random.random([1000000, 1]),
    columns=['score'])
pandas_df['key'] = np.arange(1, 1000001)

# SparkのDataFrameに変換
df = spark.createDataFrame(pandas_df)
df.take(3)
    #=> [Row(score=0.5488135039273248, key=1),
    #=>  Row(score=0.7151893663724195, key=2),
    #=>  Row(score=0.6027633760716439, key=3)]

これを10ページに区切って落としてみる。

DataFrameのPagination

DataFrameではlimitにoffsetを付けることはできないみたいなので、zipWithIndexで番号を付けて条件分岐させることにする。

# カウントを取得
df_count = df.count()
    #=> 1000000

# zipWithIndexしてrowに番号を振る
rdd = df.rdd.zipWithIndex()

# rddの中身はこんな感じでRowと連番のtupleになる
rdd.take(3)
    #=> [(Row(score=0.5488135039273248, key=1), 0),
    #=>  (Row(score=0.7151893663724195, key=2), 1),
    #=>  (Row(score=0.6027633760716439, key=3), 2)]

# 10分割してそれぞれ処理を行う
PAGE_COUNT = 10
# 1ページごとに10万レコード
PAGE_SIZE = 100000

for i in range(PAGE_COUNT):
    start, end = PAGE_SIZE * i, PAGE_SIZE * (i + 1)
    # ページ範囲のデータを取得する
    page = rdd.filter(lambda row_idx: row_idx[1] >= start and row_idx[1] < end)
    # 取得したpageからzipWithIndexで付けたindexを除去しDataFrameを再作成
    page_df = page.map(lambda row_idx: row_idx[0]).toDF()
    # toPandasしてto_csvして保存する
    page_df.toPandas().to_csv('%d.csv.gz' % i, compression='gzip')

randomにページ分割

オーダーを気にしないのであればランダムにページ番号を振ってwhereで落とす方法も考えられる。コード的にはこちらの方がシンプル。

まずはfunctions.randomで0〜9の番号を振る(カラム名はpageとする)。

from pyspark.sql import functions as F 

# ページ番号を振る
df = df.withColumn('page', (F.rand()*PAGE_COUNT).cast("int"))
df.take(5)
    #=> [Row(score=0.5488135039273248, key=1, page=9),
    #=>  Row(score=0.7151893663724195, key=2, page=3),
    #=>  Row(score=0.6027633760716439, key=3, page=9),
    #=>  Row(score=0.5448831829968969, key=4, page=4),
    #=>  Row(score=0.4236547993389047, key=5, page=4)

あとはアサインされたpageの値ごとにto_csvしていく。

for i in range(PAGE_COUNT):
    df.where(df.page == i) \
        .drop('page') \
        .toPandas() \
        .to_csv('%d.csv.gz' % i, compression='gzip')

改定履歴

Author: Masato Watanabe, Date: 2019-09-01, 記事投稿