概要
例えばMovieLensのデータで各ユーザーがどの映画を見たかを、movieIdのArrayで持っているテーブルがあったとする。
# userIdと映画のidの配列を持つDataFrame df.take(3) #=> [Row(userId='134990', movieIds=['2793', '743', '2572', '4159', '59369', ... #=> Row(userId='23947', movieIds=['4369', '1722', '2580', '4886', '5679', ... #=> Row(userId='55167', movieIds=['110', '50', '1012', '150', '2710', '3081', ...
このIdのArrayを、紐づく映画タイトルのArrayに変換したかったとする。
# userIdと映画のタイトルの配列を持つDataFrame df.take(3) #=> [Row(userId='134990', titles=['6th Day, The (2000)', 'Tank Girl (1995)', 'Space Cowboys (2000)', ... #=> Row(userId='23947', titles=['Remember the Titans (2000)', 'Crossroads (2002)', 'Entrapment (1999)', ... #=> Row(userId='22543', titles=['Gandhi (1982)', 'Enemy at the Gates (2001)', 'Stranger than Fiction (2006)', ...
explodeしてjoinしても良いけど、より負荷が低そうなbroadcastして紐付ける形で実装してみる。
サンプルデータ
MovieLensのratings.csvをuserIdでgroupByして、userId(String), movieIds(Array[Int])のDataFrameを作る。
from pyspark.sql import functions as F df = spark.read \ .option('header', 'true') \ .csv('ml-20m/ratings.csv') df = df.groupBy('userId') \ .agg(F.collect_set('movieId').alias('movieIds')) df.take(3) #=> [Row(userId='134990', movieIds=['2793', '743', '2572', '4159', '59369', ... #=> Row(userId='23947', movieIds=['4369', '1722', '2580', '4886', '5679', ... #=> Row(userId='55167', movieIds=['110', '50', '1012', '150', '2710', '3081', ...
また、これにjoinしてタイトルに変換する為に、映画のIDとタイトルを持つmovies.csvも読み込んでおく。
movies = spark.read \ .option('header', 'true') \ .csv('ml-20m/movies.csv') \ .select('movieId', 'title') movies.take(3) #=> [Row(movieId='1', title='Toy Story (1995)'), #=> Row(movieId='2', title='Jumanji (1995)'), #=> Row(movieId='3', title='Grumpier Old Men (1995)')]
この2つのDataFrameを使って変換処理を実装する。
explode, join, groupByで実行する場合
movieIdsをexplodeして、タイトルとjoinして、再度groupByするのが一般的なやり方だと思われるので、まずはそちらの方法で実行してみる。
まずはexplodeしてuserIdとmovieIdの一覧にする。
from pyspark.sql import functions as F df = df.select('userId', F.explode('movieIds').alias('movieId')) # df.take(3) #=> [Row(userId='134990', movieId='2793'), #=> Row(userId='134990', movieId='743'), #=> Row(userId='134990', movieId='2572')]
joinする。movieIdは消してuserIdとmovies.titleの2つだけのカラムにする。
df = df.join(movies, df.movieId == movies.movieId) \ .select(df.userId, movies.title) # df.take(3) #=> [Row(userId='134990', title='American Werewolf in Paris, An (1997)'), #=> Row(userId='134990', title='Spy Hard (1996)'), #=> Row(userId='134990', title='10 Things I Hate About You (1999)')]
最後にgroupByしてArrayにタイトル一覧が入った結果を生成する。
df = df.groupBy('userId') \ .agg(F.collect_set('title').alias('titles')) df.take(3) #=> [Row(userId='134990', titles=['6th Day, The (2000)', 'Tank Girl (1995)', 'Space Cowboys (2000)', ... #=> Row(userId='23947', titles=['Remember the Titans (2000)', 'Crossroads (2002)', 'Entrapment (1999)', ... #=> Row(userId='22543', titles=['Gandhi (1982)', 'Enemy at the Gates (2001)', 'Stranger than Fiction (2006)', ...
これでも結果は取れるけど大規模データになった際にリソースを多めに喰ってしまうことがある。
dictionaryに変換してbroadcastして変換
moviesのidとtitleはたかだか1.4MBなのでbroadcastしても多分セーフ。
id => titleのdictionaryに変換してbroadcastし、mapPartitionsでmovieIdsの中身を変換してみる。
まずはdictionaryのbroadcast。
# 辞書に変換 title_dic = movies.toPandas().set_index('movieId').title.to_dict() # list(title_dic.items())[0:3] #=> [('1', 'Toy Story (1995)'), #=> ('2', 'Jumanji (1995)'), #=> ('3', 'Grumpier Old Men (1995)')] # broadcast bc_title_dic = spark.sparkContext.broadcast(title_dic)
broadcastした情報を利用して地道に変換作業。
# 変換処理 def bc_map_join(rows, bc_title_dic): title_dic = bc_title_dic.value for row in rows: titles = [title_dic[movieId] for movieId in row.movieIds] yield (row.userId, titles) # mapPartitionsで変換処理を呼び出す df = df.rdd.mapPartitions(lambda rows: bc_map_join(rows, bc_title_dic)) \ .toDF(['userId', 'titles']) # df.take(3) #=> [Row(userId='134990', titles=['American Werewolf in Paris, An (1997)', 'Spy Hard (1996)', ... #=> Row(userId='23947', titles=['Fast and the Furious, The (2001)', 'Tomorrow Never Dies (1997)', ... #=> Row(userId='55167', titles=['Braveheart (1995)', 'Usual Suspects, The (1995)', 'Old Yeller (1957)', ...
できた。これでデータの規模が大きくなってもリソースの消費を抑えられそう。
分割してbroadcastして変換
上記の方法だとユーザーのセットはいくら大きくなっても大丈夫だけど、映画タイトルが増加していくとbroadcastで乗り切らないサイズになってしまう可能性が考えられる。
そこでid=>titleの一覧を5分割して少しずつ変換するコードも書いてみる。若干強引だけど動作はしてくれた。
import math # 5ページに分割する PAGE_COUNT = 5 # moviesのレコード数 movie_count = movies.count() #=> 27278 # 1ページごとのレコード数 page_size = math.floor(movie_count / PAGE_COUNT) + 1 #=> 5456 # zipWithIndexでmoviesに行番号を振る movies_rdd = movies.rdd.zipWithIndex() # join先のデータをrddにしておく df_rdd = df.rdd # broadcastしたデータでidをtitleに変換する処理 def bc_map_join(rows, bc_title_dic): title_dic = bc_title_dic.value for row in rows: titles = [] # 既に変換済みの情報があったらそれに加算していく if 'titles' in row: titles = row.titles titles += list(filter(None, [title_dic.get(movieId) for movieId in row.movieIds])) yield pyspark.sql.Row(userId=row.userId, movieIds=row.movieIds, titles=list(set(titles))) # 1ページ毎に上の紐付け処理を呼び出す for i in range(PAGE_COUNT): start = page_size * i end = page_size * (i + 1) page = movies_rdd.filter(lambda row_idx: row_idx[1] >= start and row_idx[1] < end) # 分割した分だけローカルのpandasに落とす page_df = page.map(lambda row_idx: row_idx[0]).toDF().toPandas() # 分割分をbroadcast title_dic = page_df.set_index('movieId').title.to_dict() bc_title_dic = spark.sparkContext.broadcast(title_dic) # ここでページごとの変換処理を呼び出す(broadcastが除去される前に明示的に実行) df_rdd = df_rdd.mapPartitions(lambda rows: bc_map_join(rows, bc_title_dic)).persist() df_rdd.take(1) # broadcastしたデータの除去 bc_title_dic.unpersist() df = df_rdd.toDF().select('userId', 'titles') # df.take(3) #=> [Row(userId='111710', titles=['Pearl Harbor (2001)', 'Three Kings (1999)', 'Unbreakable (2000)', ... #=> Row(userId='116194', titles=['River Wild, The (1994)', 'French Kiss (1995)', ... #=> Row(userId='117211', titles=['Toy Story (1995)', 'Star Trek: First Contact (1996)', ...
もうちょっといい書き方があると思います。。。
改定履歴
Author: Masato Watanabe, Date: 2019-09-01, 記事投稿