iMind Developers Blog

iMind開発者ブログ

PySparkで1レコードを複数行にする

概要

PySparkでexplodeやflatMap、mapPartitionsを用いて1レコードを複数レコードに加工する。またgroupByで元の1レコードに戻す。

バージョン情報

  • spark-2.3.1
  • Python 3.5.5

サンプルデータ

id, title, tagsの3つのカラムを持つcsvファイルがあるとする。tagsはスペース区切りで複数の要素を持つ。

id,title,tags
1,アルジャーノンに花束を,アメリカ ダニエル キイス 1959年
2,月は無慈悲な夜の女王,アメリカ ロバート ハインライン 1966年
3,ファウンデーション,ロシア アイザック アシモフ 1951年
4,地底旅行,フランス ジュール ヴェルヌ 1864年
5,星を継ぐもの,イギリス ジェイムズ ホーガン 1977年

このcsvをHDFS上に置いておく。

$ hadoop fs -put sf.csv sf.csv

csvファイルの読み込み

header=Trueを指定してspark.read.csvする。

df = spark \
    .read \
    .option('header', True) \
    .csv('sf.csv')
df.take(5)

結果

[Row(id='1', title='アルジャーノンに花束を', tags='アメリカ ダニエル キイス 1959年'),
 Row(id='2', title='月は無慈悲な夜の女王', tags='アメリカ ロバート ハインライン 1966年'),
 Row(id='3', title='ファウンデーション', tags='ロシア アイザック アシモフ 1951年'),
 Row(id='4', title='地底旅行', tags='フランス ジュール ヴェルヌ 1864年'),
 Row(id='5', title='星を継ぐもの', tags='イギリス ジェイムズ ホーガン 1977年')]

splitして文字列をlistに

splitでスペース区切りのtagsをlistに変換する。

from pyspark.sql import functions as F
df = df.withColumn('tags', F.split(F.col('tags'), ' '))
df.take(5)

結果

[Row(id='1', title='アルジャーノンに花束を', tags=['アメリカ', 'ダニエル', 'キイス', '1959年']),
 Row(id='2', title='月は無慈悲な夜の女王', tags=['アメリカ', 'ロバート', 'ハインライン', '1966年']),
 Row(id='3', title='ファウンデーション', tags=['ロシア', 'アイザック', 'アシモフ', '1951年']),
 Row(id='4', title='地底旅行', tags=['フランス', 'ジュール', 'ヴェルヌ', '1864年']),
 Row(id='5', title='星を継ぐもの', tags=['イギリス', 'ジェイムズ', 'ホーガン', '1977年'])]

explodeでlistを複数行に変換する

listが入っているtagsに対してexplodeして複数行に変換する。

df_e = df.select('id', F.explode(F.col('tags')).alias('tag'))
df_e.take(10)

結果

[Row(id='1', tag='アメリカ'),
 Row(id='1', tag='ダニエル'),
 Row(id='1', tag='キイス'),
 Row(id='1', tag='1959年'),
 Row(id='2', tag='アメリカ'),
 Row(id='2', tag='ロバート'),
 Row(id='2', tag='ハインライン'),
 Row(id='2', tag='1966年'),
 Row(id='3', tag='ロシア'),
 Row(id='3', tag='アイザック')]

groupbyで複数行をlistに戻す

df_g = df_e.groupby('id').agg(F.collect_list('tag').alias('tags'))
df_g.take(10)

結果

[Row(id='3', tags=['ロシア', 'アイザック', 'アシモフ', '1951年']),
 Row(id='1', tags=['アメリカ', 'ダニエル', 'キイス', '1959年']),
 Row(id='5', tags=['イギリス', 'ジェイムズ', 'ホーガン', '1977年']),
 Row(id='4', tags=['フランス', 'ジュール', 'ヴェルヌ', '1864年']),
 Row(id='2', tags=['アメリカ', 'ロバート', 'ハインライン', '1966年'])]

flatMapを用いてexplode

explodeの代わりにflatMapを用いて展開してみる。

df_e = df.rdd.flatMap(lambda row: [(row[0], tag) for tag in row[2]])
df.take(10)

結果

[('1', 'アメリカ'),
 ('1', 'ダニエル'),
 ('1', 'キイス'),
 ('1', '1959年'),
 ('2', 'アメリカ'),
 ('2', 'ロバート'),
 ('2', 'ハインライン'),
 ('2', '1966年'),
 ('3', 'ロシア'),
 ('3', 'アイザック')]

mapPartitionsを用いてexplode

mapPartitionsはpartitionごとに一括で処理を噛ませることができる。

tagの各要素でyieldすればexplodeやflatMapと同じように1行を複数行に変換できる。

from pyspark.sql import Row
def explode(rows):
    for row in rows:
        for tag in row[2]:
            yield Row(id=row[0], tag=tag)

df_e = df.rdd.mapPartitions(explode)
df_e.take(10)

結果

[Row(id='1', tag='アメリカ'),
 Row(id='1', tag='ダニエル'),
 Row(id='1', tag='キイス'),
 Row(id='1', tag='1959年'),
 Row(id='2', tag='アメリカ'),
 Row(id='2', tag='ロバート'),
 Row(id='2', tag='ハインライン'),
 Row(id='2', tag='1966年'),
 Row(id='3', tag='ロシア'),
 Row(id='3', tag='mapPartitions')]

yieldする前の段階で条件判定を入れたり複雑な処理が書けるので、込み入ったロジックを書かないといけない時にお世話になる。

改定履歴

Author: Masato Watanabe, Date: 2019-03-07, 記事投稿