概要
PySparkでexplodeやflatMap、mapPartitionsを用いて1レコードを複数レコードに加工する。またgroupByで元の1レコードに戻す。
バージョン情報
- spark-2.3.1
- Python 3.5.5
サンプルデータ
id, title, tagsの3つのカラムを持つcsvファイルがあるとする。tagsはスペース区切りで複数の要素を持つ。
id,title,tags 1,アルジャーノンに花束を,アメリカ ダニエル キイス 1959年 2,月は無慈悲な夜の女王,アメリカ ロバート ハインライン 1966年 3,ファウンデーション,ロシア アイザック アシモフ 1951年 4,地底旅行,フランス ジュール ヴェルヌ 1864年 5,星を継ぐもの,イギリス ジェイムズ ホーガン 1977年
このcsvをHDFS上に置いておく。
$ hadoop fs -put sf.csv sf.csv
csvファイルの読み込み
header=Trueを指定してspark.read.csvする。
df = spark \ .read \ .option('header', True) \ .csv('sf.csv') df.take(5)
結果
[Row(id='1', title='アルジャーノンに花束を', tags='アメリカ ダニエル キイス 1959年'), Row(id='2', title='月は無慈悲な夜の女王', tags='アメリカ ロバート ハインライン 1966年'), Row(id='3', title='ファウンデーション', tags='ロシア アイザック アシモフ 1951年'), Row(id='4', title='地底旅行', tags='フランス ジュール ヴェルヌ 1864年'), Row(id='5', title='星を継ぐもの', tags='イギリス ジェイムズ ホーガン 1977年')]
splitして文字列をlistに
splitでスペース区切りのtagsをlistに変換する。
from pyspark.sql import functions as F df = df.withColumn('tags', F.split(F.col('tags'), ' ')) df.take(5)
結果
[Row(id='1', title='アルジャーノンに花束を', tags=['アメリカ', 'ダニエル', 'キイス', '1959年']), Row(id='2', title='月は無慈悲な夜の女王', tags=['アメリカ', 'ロバート', 'ハインライン', '1966年']), Row(id='3', title='ファウンデーション', tags=['ロシア', 'アイザック', 'アシモフ', '1951年']), Row(id='4', title='地底旅行', tags=['フランス', 'ジュール', 'ヴェルヌ', '1864年']), Row(id='5', title='星を継ぐもの', tags=['イギリス', 'ジェイムズ', 'ホーガン', '1977年'])]
explodeでlistを複数行に変換する
listが入っているtagsに対してexplodeして複数行に変換する。
df_e = df.select('id', F.explode(F.col('tags')).alias('tag')) df_e.take(10)
結果
[Row(id='1', tag='アメリカ'), Row(id='1', tag='ダニエル'), Row(id='1', tag='キイス'), Row(id='1', tag='1959年'), Row(id='2', tag='アメリカ'), Row(id='2', tag='ロバート'), Row(id='2', tag='ハインライン'), Row(id='2', tag='1966年'), Row(id='3', tag='ロシア'), Row(id='3', tag='アイザック')]
groupbyで複数行をlistに戻す
df_g = df_e.groupby('id').agg(F.collect_list('tag').alias('tags')) df_g.take(10)
結果
[Row(id='3', tags=['ロシア', 'アイザック', 'アシモフ', '1951年']), Row(id='1', tags=['アメリカ', 'ダニエル', 'キイス', '1959年']), Row(id='5', tags=['イギリス', 'ジェイムズ', 'ホーガン', '1977年']), Row(id='4', tags=['フランス', 'ジュール', 'ヴェルヌ', '1864年']), Row(id='2', tags=['アメリカ', 'ロバート', 'ハインライン', '1966年'])]
flatMapを用いてexplode
explodeの代わりにflatMapを用いて展開してみる。
df_e = df.rdd.flatMap(lambda row: [(row[0], tag) for tag in row[2]]) df.take(10)
結果
[('1', 'アメリカ'), ('1', 'ダニエル'), ('1', 'キイス'), ('1', '1959年'), ('2', 'アメリカ'), ('2', 'ロバート'), ('2', 'ハインライン'), ('2', '1966年'), ('3', 'ロシア'), ('3', 'アイザック')]
mapPartitionsを用いてexplode
mapPartitionsはpartitionごとに一括で処理を噛ませることができる。
tagの各要素でyieldすればexplodeやflatMapと同じように1行を複数行に変換できる。
from pyspark.sql import Row def explode(rows): for row in rows: for tag in row[2]: yield Row(id=row[0], tag=tag) df_e = df.rdd.mapPartitions(explode) df_e.take(10)
結果
[Row(id='1', tag='アメリカ'), Row(id='1', tag='ダニエル'), Row(id='1', tag='キイス'), Row(id='1', tag='1959年'), Row(id='2', tag='アメリカ'), Row(id='2', tag='ロバート'), Row(id='2', tag='ハインライン'), Row(id='2', tag='1966年'), Row(id='3', tag='ロシア'), Row(id='3', tag='mapPartitions')]
yieldする前の段階で条件判定を入れたり複雑な処理が書けるので、込み入ったロジックを書かないといけない時にお世話になる。
改定履歴
Author: Masato Watanabe, Date: 2019-03-07, 記事投稿