概要
PySparkはUDF/UDAFが手軽に書けて便利。
バージョン情報
- Python 3.7.3
- Spark 2.4.3
サンプルデータ
カンマ区切りの文字列が入ったカラムを用意して、それをsplitしたり戻したりといった処理をudfで実行してみる。
サンプルデータとしてA, B, Cの3つのカラム(ランダムな値が入る)を持つDataFrameを生成する。
import pandas as pd import numpy as np np.random.seed(0) # 1万行くらいのデータを用意 pandas_df = pd.DataFrame( np.random.random([10000, 3]), columns=['A', 'B', 'C']) # SparkのDataFrameに変換して5件を表示 df = spark.createDataFrame(pandas_df) df.take(5)
実行結果
[Row(A=0.5488135039273248, B=0.7151893663724195, C=0.6027633760716439), Row(A=0.5448831829968969, B=0.4236547993389047, C=0.6458941130666561), Row(A=0.4375872112626925, B=0.8917730007820798, C=0.9636627605010293), Row(A=0.3834415188257777, B=0.7917250380826646, C=0.5288949197529045), Row(A=0.5680445610939323, B=0.925596638292661, C=0.07103605819788694)]
UDF
渡された引数に+1して返すUDFを動かしてみる。
当該条件に沿ったPythonの関数を記述して、udf(関数名, 戻り値の型)を宣言するだけ。
from pyspark.sql import types as T, functions as F # +1する関数 def plus1(v): return v + 1 # UDFの宣言。引数1は関数、引数2は戻り値の型を渡す。 plus1_udf = F.udf(plus1, T.DoubleType()) # UDFを呼び出す tmp_df = df.withColumn('A_1', plus1_udf('A')) tmp_df.select('A', 'A_1').take(5)
結果
[Row(A=0.5488135039273248, A_1=1.5488135039273248), Row(A=0.5448831829968969, A_1=1.5448831829968968), Row(A=0.4375872112626925, A_1=1.4375872112626924), Row(A=0.3834415188257777, A_1=1.3834415188257778), Row(A=0.5680445610939323, A_1=1.5680445610939322)]
PySparkのUDFはこうした軽いロジックが入る処理をとても簡単に書ける。
生成したUDFはクエリから呼び出すこともできる。
デコレータによるUDFの宣言
上記では関数を記述してから別途udfを宣言した。
デコレータで宣言することもできる。
# デコレータで指定してUDFを宣言 @F.udf(T.DoubleType()) def plus2_udf(v): return v + 2 tmp_df = df.withColumn('A_2', plus2_udf('A')) tmp_df.select('A', 'A_2').take(5)
[Row(A=0.5488135039273248, A_2=2.5488135039273248), Row(A=0.5448831829968969, A_2=2.5448831829968968), Row(A=0.4375872112626925, A_2=2.4375872112626924), Row(A=0.3834415188257777, A_2=2.383441518825778), Row(A=0.5680445610939323, A_2=2.568044561093932)]
こちらの方が記述としては簡易だけど、PySparkからもローカルのPythonからも同じ処理が呼びたくなる可能性がある場合は、分けて宣言した方が良いこともある。
複数のカラムを受け取るUDF
UDFでは複数のカラムを引数に取ることもできる。
例としてA, B, Cを引数に取って標準偏差を取るUDFを記述する。
# 標準偏差を取るUDFを宣言 @F.udf(T.DoubleType()) def std_udf(a, b, c): return float(np.std([a, b, c])) # 呼び出す df = df.withColumn('std', std_udf('A', 'B', 'C')) df.select('std').take(5)
実行結果
[Row(std=0.06930698185436182), Row(std=0.09085387630944602), Row(std=0.23290635992943487), Row(std=0.16896146669163353), Row(std=0.35041796074465936)]
コードではnumpy.stdした結果をfloatに変換している。変換しないと型の違いで下記のように怒られる。
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
Types
上記ではDoubleTypeを返したが、StringType, IntegerType, LongType, BooleanType, BinaryType, DateType, FloatType, TimestampType, NullType, MapType, StructTypeなど、各種タイプが用意されている。
https://spark.apache.org/docs/2.4.3/api/python/_modules/pyspark/sql/types.html
Typesの指定によってある程度は暗黙的にキャストを行ってくれる。
たとえば+1するudfの型指定をStringTypeにした場合。
@F.udf(T.StringType()) def plus1_udf(v): return v + 1 tmp_df = df.withColumn('A_1', plus1_udf('A')) tmp_df.select('A', 'A_1').take(5)
実行結果。A_1が文字列になっている。
[Row(A=0.5488135039273248, A_1='1.5488135039273248'), Row(A=0.5448831829968969, A_1='1.5448831829968968'), Row(A=0.4375872112626925, A_1='1.4375872112626924'), Row(A=0.3834415188257777, A_1='1.3834415188257778'), Row(A=0.5680445610939323, A_1='1.5680445610939322')]
また変換ができない場合はNoneになる。下記はIntegerTypeを指定しているが、Double → Integerはサポートしていない為、castに失敗してNoneが返っている。
@F.udf(T.IntegerType()) def plus1_udf(v): return v + 1 tmp_df = df.withColumn('A_1', plus1_udf('A')) tmp_df.select('A', 'A_1').take(5)
[Row(A=0.5488135039273248, A_1=None), Row(A=0.5448831829968969, A_1=None), Row(A=0.4375872112626925, A_1=None), Row(A=0.3834415188257777, A_1=None), Row(A=0.5680445610939323, A_1=None)]
MapTypeを返すUDF
MapTypeはKeyとValueそれぞれのTypeも指定する。
下記は{'A': Aの値, 'B': Bの値}でMapを生成している。
@F.udf(T.MapType(T.StringType(), T.DoubleType())) def to_map_udf(a, b): return {'A': a, 'B': b} df.withColumn('AB', to_map_udf('A', 'B')) \ .select('AB') \ .take(5)
結果
[Row(AB={'A': 0.5488135039273248, 'B': 0.7151893663724195}), Row(AB={'A': 0.5448831829968969, 'B': 0.4236547993389047}), Row(AB={'A': 0.4375872112626925, 'B': 0.8917730007820798}), Row(AB={'A': 0.3834415188257777, 'B': 0.7917250380826646}), Row(AB={'A': 0.5680445610939323, 'B': 0.925596638292661})]
pandas_udf
@pandas_udfを指定することでvectorizeされたデータを渡して高速にnumpyやpandasで処理をすることができる。
pandas_udfに渡す引数は3つ。
pandas_udf(f=None, returnType=None, functionType=None)
fがfunction。returnTypeが戻り値の型。functionTypeが引数のタイプ。
引数のタイプには以下の3つが指定できる。
タイプ | 引数に渡される型 |
---|---|
SCALAR | pandasのSeries |
GROUPED_MAP | groupbyされたDataFrame |
GROUPED_AGG | groupbyされたSeries |
SCALAR
先ほどのnp.stdした際のコードをpandas_udf版で書いてみる。
a, b, cはそれぞれSeriesで渡されるので、DataFrameに直してstdを取ってみる。
@F.pandas_udf(returnType="double") def std_udf(a, b, c): return pd.DataFrame([a, b, c]).std() # 呼び出す tmp_df = df.withColumn('std', std_udf('A', 'B', 'C')) tmp_df.select('std').take(5)
実行結果
[Row(std=0.08488337057775956), Row(std=0.1112728190560398), Row(std=0.28525086983805886), Row(std=0.20693468979337898), Row(std=0.42917260026552073)]
pandasでいうところのこういう処理をしている。
pd.DataFrame([pandas_df.A, pandas_df.B, pandas_df.C]).std()
GROUPED_MAP
GROUPED_MAPはgroup byしたDataFrameを引数に取り、Schemaで設定した値を返します。
例としてカラムAの小数点1桁の値でgroup byしてA列, B列, C列それぞれのmeanを返す処理を実行してみる。
groupbyしやすいカラムを今回のDataFrameは持っていないので、Aの値の小数点1桁をIntを作って0〜9の値でグループが作れるようにする。
tenx_int_udf = F.udf(lambda x: int(x * 10), T.IntegerType()) tmp_df = df.withColumn('A_INT', tenx_int_udf('A')) tmp_df.select('A', 'A_INT').take(5)
実行結果
[Row(A=0.5488135039273248, A_INT=5), Row(A=0.5448831829968969, A_INT=5), Row(A=0.4375872112626925, A_INT=4), Row(A=0.3834415188257777, A_INT=3), Row(A=0.5680445610939323, A_INT=5)]
pandas_udfを記述し、追加されたA_INTでgroup byしてapplyしてみる。
# Schemaの設定 schema = T.StructType([ T.StructField('A', T.DoubleType()), T.StructField('B', T.DoubleType()), T.StructField('C', T.DoubleType()), T.StructField('A_INT', T.IntegerType()) ]) # 各カラムのmeanを取って返す @F.pandas_udf(returnType=schema, functionType=F.PandasUDFType.GROUPED_MAP) def stats_udf(pdf): mean = pdf.mean() return pd.DataFrame(mean).T # ソートして結果を表示 tmp_df = tmp_df.groupby('A_INT').apply(stats_udf) tmp_df.orderBy('A_INT').collect()
実行結果
[Row(A=0.049572991832265344, B=0.48636115573217087, C=0.49684142546384646, A_INT=0), Row(A=0.1485495542178649, B=0.4879194026859281, C=0.48570094684084514, A_INT=1), Row(A=0.24968414766986902, B=0.4994563633088348, C=0.501692179056708, A_INT=2), Row(A=0.35009481926917224, B=0.5036896862925305, C=0.4890935979154168, A_INT=3), Row(A=0.4491004446873258, B=0.5048263261034505, C=0.4939256054116147, A_INT=4), Row(A=0.5500180340896127, B=0.4871415585766198, C=0.5038413218268272, A_INT=5), Row(A=0.651505854442605, B=0.4885796294445262, C=0.5044430265925124, A_INT=6), Row(A=0.7511341385968157, B=0.4977673756360947, C=0.505599177518435, A_INT=7), Row(A=0.8500983501029348, B=0.5003624963995137, C=0.4937047504740419, A_INT=8), Row(A=0.949296647455288, B=0.49326581500608757, C=0.49799167586781556, A_INT=9)]
上記ではスキーマを1つ1つ指定したが、簡易記法で記述することもできる。
schema = 'A double, B double, C double, A_INT int' @F.pandas_udf(returnType=schema, functionType=F.PandasUDFType.GROUPED_MAP) def stats_udf(pdf): mean = pdf.mean() return pd.DataFrame(mean).T # ソートして結果を表示 tmp_df = tmp_df.groupby('A_INT').apply(stats_udf) tmp_df.orderBy('A_INT').collect()
group byされたデータはDataFrameとして渡されるので、データ量によってはメモリに気をつけて処理を行わないといけない。どの程度の規模のデータになるかわからない処理については使いづらい。
GROUPED_AGG
GROUPED_MAPはgroupのDataFrameをudfに渡したが、GROUPED_AGGはgroupのschemaをDataFrameに渡して
pandasで書くとこういう差。
# GROUPED_MAP pandas_df.groupby('A_INT').apply(f) # GROUPED_AGG pandas_df.groupby('A_INT').column_a.agg(f)
GROUPED_AGGでgroup byしたカラムAの平均を取ってみる。
tenx_int_udf = F.udf(lambda x: int(x * 10), T.IntegerType()) tmp_df = df.withColumn('A_INT', tenx_int_udf('A')) tmp_df.select('A', 'A_INT').take(5) @F.pandas_udf(returnType=T.DoubleType(), functionType=F.PandasUDFType.GROUPED_AGG) def stats_udf(pdf): return pdf.mean() # ソートして結果を表示 tmp_df.groupby('A_INT').agg(stats_udf('A')) \ .collect()
実行結果
[Row(A_INT=1, stats_udf(A)=0.14854955421786484), Row(A_INT=6, stats_udf(A)=0.651505854442605), Row(A_INT=3, stats_udf(A)=0.3500948192691723), Row(A_INT=4, stats_udf(A)=0.4491004446873261), Row(A_INT=7, stats_udf(A)=0.7511341385968164), Row(A_INT=0, stats_udf(A)=0.04957299183226535), Row(A_INT=8, stats_udf(A)=0.8500983501029343), Row(A_INT=5, stats_udf(A)=0.5500180340896124), Row(A_INT=2, stats_udf(A)=0.24968414766986913), Row(A_INT=9, stats_udf(A)=0.9492966474552892)]
カラム名がudf呼び出しの記述になってしまった。
aliasで名前を指定しておく。
tmp_df.groupby('A_INT').agg(stats_udf('A').alias('A_MEAN')) \ .collect()
実行結果
[Row(A_INT=1, A_MEAN=0.14854955421786484), Row(A_INT=6, A_MEAN=0.651505854442605), Row(A_INT=3, A_MEAN=0.35009481926917224), Row(A_INT=4, A_MEAN=0.44910044468732613), Row(A_INT=7, A_MEAN=0.7511341385968165), Row(A_INT=0, A_MEAN=0.04957299183226535), Row(A_INT=8, A_MEAN=0.8500983501029343), Row(A_INT=5, A_MEAN=0.5500180340896124), Row(A_INT=2, A_MEAN=0.24968414766986913), Row(A_INT=9, A_MEAN=0.9492966474552892)]
改定履歴
Author: Masato Watanabe, Date: 2019-07-02, 記事投稿