iMind Developers Blog

iMind開発者ブログ

PySparkでarrayのカラムに対してbroadcast join

概要

例えばMovieLensのデータで各ユーザーがどの映画を見たかを、movieIdのArrayで持っているテーブルがあったとする。

# userIdと映画のidの配列を持つDataFrame
df.take(3)
    #=> [Row(userId='134990', movieIds=['2793', '743', '2572', '4159', '59369', ...
    #=>  Row(userId='23947', movieIds=['4369', '1722', '2580', '4886', '5679',  ...
    #=>  Row(userId='55167', movieIds=['110', '50', '1012', '150', '2710', '3081',  ...

このIdのArrayを、紐づく映画タイトルのArrayに変換したかったとする。

# userIdと映画のタイトルの配列を持つDataFrame
df.take(3)
    #=> [Row(userId='134990', titles=['6th Day, The (2000)', 'Tank Girl (1995)', 'Space Cowboys (2000)', ...
    #=> Row(userId='23947', titles=['Remember the Titans (2000)', 'Crossroads (2002)', 'Entrapment (1999)', ...
    #=> Row(userId='22543', titles=['Gandhi (1982)', 'Enemy at the Gates (2001)', 'Stranger than Fiction (2006)', ...

explodeしてjoinしても良いけど、より負荷が低そうなbroadcastして紐付ける形で実装してみる。

サンプルデータ

MovieLensのratings.csvをuserIdでgroupByして、userId(String), movieIds(Array[Int])のDataFrameを作る。

from pyspark.sql import functions as F

df = spark.read \
    .option('header', 'true') \
    .csv('ml-20m/ratings.csv')

df = df.groupBy('userId') \
    .agg(F.collect_set('movieId').alias('movieIds'))

df.take(3)
    #=> [Row(userId='134990', movieIds=['2793', '743', '2572', '4159', '59369', ...
    #=>  Row(userId='23947', movieIds=['4369', '1722', '2580', '4886', '5679',  ...
    #=>  Row(userId='55167', movieIds=['110', '50', '1012', '150', '2710', '3081',  ...

また、これにjoinしてタイトルに変換する為に、映画のIDとタイトルを持つmovies.csvも読み込んでおく。

movies = spark.read \
    .option('header', 'true') \
    .csv('ml-20m/movies.csv') \
    .select('movieId', 'title')

movies.take(3)
    #=> [Row(movieId='1', title='Toy Story (1995)'),
    #=>  Row(movieId='2', title='Jumanji (1995)'),
    #=>  Row(movieId='3', title='Grumpier Old Men (1995)')]

この2つのDataFrameを使って変換処理を実装する。

explode, join, groupByで実行する場合

movieIdsをexplodeして、タイトルとjoinして、再度groupByするのが一般的なやり方だと思われるので、まずはそちらの方法で実行してみる。

まずはexplodeしてuserIdとmovieIdの一覧にする。

from pyspark.sql import functions as F

df = df.select('userId', F.explode('movieIds').alias('movieId'))
# df.take(3)
    #=> [Row(userId='134990', movieId='2793'),
    #=>  Row(userId='134990', movieId='743'),
    #=>  Row(userId='134990', movieId='2572')]

joinする。movieIdは消してuserIdとmovies.titleの2つだけのカラムにする。

df = df.join(movies, df.movieId == movies.movieId) \
    .select(df.userId, movies.title)
# df.take(3)
    #=> [Row(userId='134990', title='American Werewolf in Paris, An (1997)'),
    #=>  Row(userId='134990', title='Spy Hard (1996)'),
    #=>  Row(userId='134990', title='10 Things I Hate About You (1999)')]

最後にgroupByしてArrayにタイトル一覧が入った結果を生成する。

df = df.groupBy('userId') \
    .agg(F.collect_set('title').alias('titles'))
df.take(3)
    #=> [Row(userId='134990', titles=['6th Day, The (2000)', 'Tank Girl (1995)', 'Space Cowboys (2000)', ...
    #=> Row(userId='23947', titles=['Remember the Titans (2000)', 'Crossroads (2002)', 'Entrapment (1999)', ...
    #=> Row(userId='22543', titles=['Gandhi (1982)', 'Enemy at the Gates (2001)', 'Stranger than Fiction (2006)', ...

これでも結果は取れるけど大規模データになった際にリソースを多めに喰ってしまうことがある。

dictionaryに変換してbroadcastして変換

moviesのidとtitleはたかだか1.4MBなのでbroadcastしても多分セーフ。

id => titleのdictionaryに変換してbroadcastし、mapPartitionsでmovieIdsの中身を変換してみる。

まずはdictionaryのbroadcast。

# 辞書に変換
title_dic = movies.toPandas().set_index('movieId').title.to_dict()
# list(title_dic.items())[0:3]
    #=> [('1', 'Toy Story (1995)'),
    #=>  ('2', 'Jumanji (1995)'),
    #=>  ('3', 'Grumpier Old Men (1995)')]

# broadcast
bc_title_dic = spark.sparkContext.broadcast(title_dic)

broadcastした情報を利用して地道に変換作業。

# 変換処理
def bc_map_join(rows, bc_title_dic):
    title_dic = bc_title_dic.value
    for row in rows:
        titles = [title_dic[movieId] for movieId in row.movieIds]
        yield (row.userId, titles)

# mapPartitionsで変換処理を呼び出す
df = df.rdd.mapPartitions(lambda rows: bc_map_join(rows, bc_title_dic)) \
    .toDF(['userId', 'titles'])
# df.take(3)
    #=> [Row(userId='134990', titles=['American Werewolf in Paris, An (1997)', 'Spy Hard (1996)',  ...
    #=>  Row(userId='23947', titles=['Fast and the Furious, The (2001)', 'Tomorrow Never Dies (1997)', ...
    #=> Row(userId='55167', titles=['Braveheart (1995)', 'Usual Suspects, The (1995)', 'Old Yeller (1957)',  ...

できた。これでデータの規模が大きくなってもリソースの消費を抑えられそう。

分割してbroadcastして変換

上記の方法だとユーザーのセットはいくら大きくなっても大丈夫だけど、映画タイトルが増加していくとbroadcastで乗り切らないサイズになってしまう可能性が考えられる。

そこでid=>titleの一覧を5分割して少しずつ変換するコードも書いてみる。若干強引だけど動作はしてくれた。

import math

# 5ページに分割する
PAGE_COUNT = 5

# moviesのレコード数
movie_count = movies.count()
    #=> 27278

# 1ページごとのレコード数
page_size = math.floor(movie_count / PAGE_COUNT) + 1
    #=> 5456

# zipWithIndexでmoviesに行番号を振る
movies_rdd = movies.rdd.zipWithIndex()

# join先のデータをrddにしておく
df_rdd = df.rdd

# broadcastしたデータでidをtitleに変換する処理
def bc_map_join(rows, bc_title_dic):
    title_dic = bc_title_dic.value
    for row in rows:
        titles = []
        # 既に変換済みの情報があったらそれに加算していく
        if 'titles' in row:
            titles = row.titles
        titles += list(filter(None, [title_dic.get(movieId) for movieId in row.movieIds]))
        yield pyspark.sql.Row(userId=row.userId, movieIds=row.movieIds, titles=list(set(titles)))

# 1ページ毎に上の紐付け処理を呼び出す
for i in range(PAGE_COUNT):
    start = page_size * i
    end = page_size * (i + 1)
    page = movies_rdd.filter(lambda row_idx: row_idx[1] >= start and row_idx[1] < end)
    # 分割した分だけローカルのpandasに落とす
    page_df = page.map(lambda row_idx: row_idx[0]).toDF().toPandas()
    # 分割分をbroadcast
    title_dic = page_df.set_index('movieId').title.to_dict()
    bc_title_dic = spark.sparkContext.broadcast(title_dic)
    # ここでページごとの変換処理を呼び出す(broadcastが除去される前に明示的に実行)
    df_rdd = df_rdd.mapPartitions(lambda rows: bc_map_join(rows, bc_title_dic)).persist()
    df_rdd.take(1)
    # broadcastしたデータの除去
    bc_title_dic.unpersist()

df = df_rdd.toDF().select('userId', 'titles')
# df.take(3)
    #=> [Row(userId='111710', titles=['Pearl Harbor (2001)', 'Three Kings (1999)', 'Unbreakable (2000)', ...
    #=>  Row(userId='116194', titles=['River Wild, The (1994)', 'French Kiss (1995)', ...
    #=>  Row(userId='117211', titles=['Toy Story (1995)', 'Star Trek: First Contact (1996)',  ...

もうちょっといい書き方があると思います。。。

改定履歴

Author: Masato Watanabe, Date: 2019-09-01, 記事投稿