概要
大きいDataFrameをページで区切りながらtoPandas()して保存してみる。
バージョン情報
- Spark 2.4.3
サンプルデータの用意
1〜100万までのIDとランダムなスコアを持つDataFrameを用意する。
import pandas as pd import numpy as np np.random.seed(0) # 100万行のデータを用意(1つ目) pandas_df = pd.DataFrame( np.random.random([1000000, 1]), columns=['score']) pandas_df['key'] = np.arange(1, 1000001) # SparkのDataFrameに変換 df = spark.createDataFrame(pandas_df) df.take(3) #=> [Row(score=0.5488135039273248, key=1), #=> Row(score=0.7151893663724195, key=2), #=> Row(score=0.6027633760716439, key=3)]
これを10ページに区切って落としてみる。
DataFrameのPagination
DataFrameではlimitにoffsetを付けることはできないみたいなので、zipWithIndexで番号を付けて条件分岐させることにする。
# カウントを取得 df_count = df.count() #=> 1000000 # zipWithIndexしてrowに番号を振る rdd = df.rdd.zipWithIndex() # rddの中身はこんな感じでRowと連番のtupleになる rdd.take(3) #=> [(Row(score=0.5488135039273248, key=1), 0), #=> (Row(score=0.7151893663724195, key=2), 1), #=> (Row(score=0.6027633760716439, key=3), 2)] # 10分割してそれぞれ処理を行う PAGE_COUNT = 10 # 1ページごとに10万レコード PAGE_SIZE = 100000 for i in range(PAGE_COUNT): start, end = PAGE_SIZE * i, PAGE_SIZE * (i + 1) # ページ範囲のデータを取得する page = rdd.filter(lambda row_idx: row_idx[1] >= start and row_idx[1] < end) # 取得したpageからzipWithIndexで付けたindexを除去しDataFrameを再作成 page_df = page.map(lambda row_idx: row_idx[0]).toDF() # toPandasしてto_csvして保存する page_df.toPandas().to_csv('%d.csv.gz' % i, compression='gzip')
randomにページ分割
オーダーを気にしないのであればランダムにページ番号を振ってwhereで落とす方法も考えられる。コード的にはこちらの方がシンプル。
まずはfunctions.randomで0〜9の番号を振る(カラム名はpageとする)。
from pyspark.sql import functions as F # ページ番号を振る df = df.withColumn('page', (F.rand()*PAGE_COUNT).cast("int")) df.take(5) #=> [Row(score=0.5488135039273248, key=1, page=9), #=> Row(score=0.7151893663724195, key=2, page=3), #=> Row(score=0.6027633760716439, key=3, page=9), #=> Row(score=0.5448831829968969, key=4, page=4), #=> Row(score=0.4236547993389047, key=5, page=4)
あとはアサインされたpageの値ごとにto_csvしていく。
for i in range(PAGE_COUNT): df.where(df.page == i) \ .drop('page') \ .toPandas() \ .to_csv('%d.csv.gz' % i, compression='gzip')
改定履歴
Author: Masato Watanabe, Date: 2019-09-01, 記事投稿