概要
PySparkで作成したUDFをPythonやSparkの経験がない人にも使ってもらいたい時は、registerしてSQLで呼び出せるようにするとハードルが下がる。
本稿はpysparkでUDFを書く、spark.udf.registerでUDFを登録する、クエリから呼び出す、といった一連の流れを実践する。また実行結果をファイル出力する方法についても合わせて扱っている。
バージョン情報
- Python 3.7.3
- Spark 2.4.3
サンプルのUDF
例として配列を受け取って平均、標準偏差、中央値を返すUDFを作る。標準ライブラリのみで実装。
引数に数値のArrayを受け取り、戻り値に{String: Double}のMapを返す。
import statistics from pyspark.sql import types as T, functions as F def describe(arr): mean = statistics.mean(arr) std = statistics.stdev(arr) median = statistics.median(arr) return {'mean': mean, 'std': std, 'median': median} # 戻り値をMap<String, Double>にしてudfを作成 describe_udf = F.udf(describe, T.MapType(T.StringType(), T.DoubleType()))
サンプルデータ
UDFに渡すサンプルデータを作っておく。型を残したいのでjsonでファイル生成。
import numpy as np import pandas as pd df = pd.DataFrame(index=range(10), columns=['data']) df['data'] = df.data.apply(lambda x: np.random.random(10)) df.to_json('foo.json', orient='records')
これでdataというカラム名で10個の数値を持つjsonが生成された。
このjsonをどこかSparkから読める場所に配置してUDFを試す。
PySparkでUDFの呼び出し
まずはSpark側でファイルの読込み。
df = spark.read.json('foo.json') df.take(3) #=> [Row(data=[0.2588221656, 0.5420820042, 0.0920467156, 0.8355928241, 0.8176913772, 0.1500472307, 0.4949689105, 0.6446566978, 0.6865406183, 0.3963092514]), #=> Row(data=[0.2052692841, 0.0229949015, 0.801438586, 0.8056066628, 0.090603724, 0.4647900515, 0.8701400697, 0.9887174155, 0.171965076, 0.5817299883]), #=> Row(data=[0.9524736957, 0.9447458733, 0.2356961786, 0.2875851369, 0.2687000525, 0.5162268306, 0.1720925364, 0.7062852154, 0.5671749945, 0.6132047333])]
withColumnでUDFを呼び出してみる。
df.withColumn('desc', describe_udf(F.col('data'))).select('desc').take(3) #=> [Row(desc={'median': 0.51852545735, 'std': 0.26420127780236047, 'mean': 0.49187577954}), #=> Row(desc={'median': 0.5232600199, 'std': 0.3586455677511948, 'mean': 0.50032557594}), #=> Row(desc={'median': 0.54170091255, 'std': 0.28513080794533796, 'mean': 0.52641852472})]
dataカラムの中に入った配列のmedia, std, meanが取得できた。
これをクエリだけでできるようにしたい。
SQLからudfを呼び出せるよう登録する
DataFrameにクエリからアクセスできるようにjsonファイルの情報をtemp tableにregisterしておく。
df = spark.read.json('foo.json') df.repartition(1).registerTempTable('tmp_foo_json')
UDFをregisterする。
spark.udf.register("describe", describe_udf)
クエリを実行。
sql = 'select describe(data) as desc from tmp_foo_json' spark.sql(sql).take(3)
実行結果
[Row(desc={'median': 0.51852545735, 'std': 0.26420127780236047, 'mean': 0.49187577954}), Row(desc={'median': 0.5232600199, 'std': 0.3586455677511948, 'mean': 0.50032557594}), Row(desc={'median': 0.54170091255, 'std': 0.28513080794533796, 'mean': 0.52641852472})]
クエリの結果をHDFSに出力
cliでhiveを叩く場合はinsert overwrite directoryしたり標準出力をそのままリダイレクトしたりして結果をファイルに出すことがある。
PySParkを利用する場合もinsert overwriteでHDFS上に結果を出力できる。
下記は区切り文字等も指定しつつクエリを実行した場合。
sql = ''' insert overwrite directory 'query_result' row format delimited fields terminated by '\t' collection items terminated by ',' map keys terminated by ':' select describe(data) as desc from tmp_foo_json ''' spark.sql(sql)
実行結果
$ hadoop fs -text query_result/* | head -3 median:0.51852545735,std:0.26420127780236047,mean:0.49187577954 median:0.5232600199,std:0.3586455677511948,mean:0.50032557594 median:0.54170091255,std:0.28513080794533796,mean:0.52641852472
クエリの結果をローカルに出力
集計結果などの小さなデータはわざわざHDFSには出さずにそのままローカルに出力してしまいたいことも多い。
toPandasでlocalのpandasのDataFrameに変換し、ファイルに出力してみる。DataFrameの内容が一時的にメモリに読み込まれるのであまり大きい実行結果に対しては利用できない。
sql = 'select describe(data) as desc from tmp_foo_json' df = spark.sql(sql) local_df = df.toPandas() # 何かしらの形式で保存 local_df.to_pickle('(desc.pickle')
改定履歴
Author: Masato Watanabe, Date: 2019-06-15, 記事投稿