iMind Developers Blog

iMind開発者ブログ

PySparkでUDFを書く

概要

PySparkはUDF/UDAFが手軽に書けて便利。

バージョン情報

  • Python 3.7.3
  • Spark 2.4.3

サンプルデータ

カンマ区切りの文字列が入ったカラムを用意して、それをsplitしたり戻したりといった処理をudfで実行してみる。

サンプルデータとしてA, B, Cの3つのカラム(ランダムな値が入る)を持つDataFrameを生成する。

import pandas as pd
import numpy as np

np.random.seed(0)

# 1万行くらいのデータを用意
pandas_df = pd.DataFrame(
    np.random.random([10000, 3]),
    columns=['A', 'B', 'C'])

# SparkのDataFrameに変換して5件を表示
df = spark.createDataFrame(pandas_df)
df.take(5)

実行結果

[Row(A=0.5488135039273248, B=0.7151893663724195, C=0.6027633760716439),
 Row(A=0.5448831829968969, B=0.4236547993389047, C=0.6458941130666561),
 Row(A=0.4375872112626925, B=0.8917730007820798, C=0.9636627605010293),
 Row(A=0.3834415188257777, B=0.7917250380826646, C=0.5288949197529045),
 Row(A=0.5680445610939323, B=0.925596638292661, C=0.07103605819788694)]

UDF

渡された引数に+1して返すUDFを動かしてみる。

当該条件に沿ったPythonの関数を記述して、udf(関数名, 戻り値の型)を宣言するだけ。

from pyspark.sql import types as T, functions as F

# +1する関数
def plus1(v):
    return v + 1

# UDFの宣言。引数1は関数、引数2は戻り値の型を渡す。
plus1_udf = F.udf(plus1, T.DoubleType())

# UDFを呼び出す
tmp_df = df.withColumn('A_1', plus1_udf('A'))
tmp_df.select('A', 'A_1').take(5)

結果

[Row(A=0.5488135039273248, A_1=1.5488135039273248),
 Row(A=0.5448831829968969, A_1=1.5448831829968968),
 Row(A=0.4375872112626925, A_1=1.4375872112626924),
 Row(A=0.3834415188257777, A_1=1.3834415188257778),
 Row(A=0.5680445610939323, A_1=1.5680445610939322)]

PySparkのUDFはこうした軽いロジックが入る処理をとても簡単に書ける。

生成したUDFはクエリから呼び出すこともできる。

デコレータによるUDFの宣言

上記では関数を記述してから別途udfを宣言した。

デコレータで宣言することもできる。

# デコレータで指定してUDFを宣言
@F.udf(T.DoubleType())
def plus2_udf(v):
    return v + 2

tmp_df = df.withColumn('A_2', plus2_udf('A'))
tmp_df.select('A', 'A_2').take(5)
[Row(A=0.5488135039273248, A_2=2.5488135039273248),
 Row(A=0.5448831829968969, A_2=2.5448831829968968),
 Row(A=0.4375872112626925, A_2=2.4375872112626924),
 Row(A=0.3834415188257777, A_2=2.383441518825778),
 Row(A=0.5680445610939323, A_2=2.568044561093932)]

こちらの方が記述としては簡易だけど、PySparkからもローカルのPythonからも同じ処理が呼びたくなる可能性がある場合は、分けて宣言した方が良いこともある。

複数のカラムを受け取るUDF

UDFでは複数のカラムを引数に取ることもできる。

例としてA, B, Cを引数に取って標準偏差を取るUDFを記述する。

# 標準偏差を取るUDFを宣言
@F.udf(T.DoubleType())
def std_udf(a, b, c):
    return float(np.std([a, b, c]))

# 呼び出す
df = df.withColumn('std', std_udf('A', 'B', 'C'))
df.select('std').take(5)

実行結果

[Row(std=0.06930698185436182),
 Row(std=0.09085387630944602),
 Row(std=0.23290635992943487),
 Row(std=0.16896146669163353),
 Row(std=0.35041796074465936)]

コードではnumpy.stdした結果をfloatに変換している。変換しないと型の違いで下記のように怒られる。

net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)

Types

上記ではDoubleTypeを返したが、StringType, IntegerType, LongType, BooleanType, BinaryType, DateType, FloatType, TimestampType, NullType, MapType, StructTypeなど、各種タイプが用意されている。

https://spark.apache.org/docs/2.4.3/api/python/_modules/pyspark/sql/types.html

Typesの指定によってある程度は暗黙的にキャストを行ってくれる。

たとえば+1するudfの型指定をStringTypeにした場合。

@F.udf(T.StringType())
def plus1_udf(v):
    return v + 1

tmp_df = df.withColumn('A_1', plus1_udf('A'))
tmp_df.select('A', 'A_1').take(5)

実行結果。A_1が文字列になっている。

[Row(A=0.5488135039273248, A_1='1.5488135039273248'),
 Row(A=0.5448831829968969, A_1='1.5448831829968968'),
 Row(A=0.4375872112626925, A_1='1.4375872112626924'),
 Row(A=0.3834415188257777, A_1='1.3834415188257778'),
 Row(A=0.5680445610939323, A_1='1.5680445610939322')]

また変換ができない場合はNoneになる。下記はIntegerTypeを指定しているが、Double → Integerはサポートしていない為、castに失敗してNoneが返っている。

@F.udf(T.IntegerType())
def plus1_udf(v):
    return v + 1

tmp_df = df.withColumn('A_1', plus1_udf('A'))
tmp_df.select('A', 'A_1').take(5)
[Row(A=0.5488135039273248, A_1=None),
 Row(A=0.5448831829968969, A_1=None),
 Row(A=0.4375872112626925, A_1=None),
 Row(A=0.3834415188257777, A_1=None),
 Row(A=0.5680445610939323, A_1=None)]

MapTypeを返すUDF

MapTypeはKeyとValueそれぞれのTypeも指定する。

下記は{'A': Aの値, 'B': Bの値}でMapを生成している。

@F.udf(T.MapType(T.StringType(), T.DoubleType()))
def to_map_udf(a, b):
    return {'A': a, 'B': b}

df.withColumn('AB', to_map_udf('A', 'B')) \
        .select('AB') \
        .take(5)

結果

[Row(AB={'A': 0.5488135039273248, 'B': 0.7151893663724195}),
 Row(AB={'A': 0.5448831829968969, 'B': 0.4236547993389047}),
 Row(AB={'A': 0.4375872112626925, 'B': 0.8917730007820798}),
 Row(AB={'A': 0.3834415188257777, 'B': 0.7917250380826646}),
 Row(AB={'A': 0.5680445610939323, 'B': 0.925596638292661})]

pandas_udf

@pandas_udfを指定することでvectorizeされたデータを渡して高速にnumpyやpandasで処理をすることができる。

pandas_udfに渡す引数は3つ。

pandas_udf(f=None, returnType=None, functionType=None)

fがfunction。returnTypeが戻り値の型。functionTypeが引数のタイプ。

引数のタイプには以下の3つが指定できる。

タイプ 引数に渡される型
SCALAR pandasのSeries
GROUPED_MAP groupbyされたDataFrame
GROUPED_AGG groupbyされたSeries

SCALAR

先ほどのnp.stdした際のコードをpandas_udf版で書いてみる。

a, b, cはそれぞれSeriesで渡されるので、DataFrameに直してstdを取ってみる。

@F.pandas_udf(returnType="double")
def std_udf(a, b, c):
    return pd.DataFrame([a, b, c]).std()

# 呼び出す
tmp_df = df.withColumn('std', std_udf('A', 'B', 'C'))
tmp_df.select('std').take(5)

実行結果

[Row(std=0.08488337057775956),
 Row(std=0.1112728190560398),
 Row(std=0.28525086983805886),
 Row(std=0.20693468979337898),
 Row(std=0.42917260026552073)]

pandasでいうところのこういう処理をしている。

pd.DataFrame([pandas_df.A, pandas_df.B, pandas_df.C]).std()

GROUPED_MAP

GROUPED_MAPはgroup byしたDataFrameを引数に取り、Schemaで設定した値を返します。

例としてカラムAの小数点1桁の値でgroup byしてA列, B列, C列それぞれのmeanを返す処理を実行してみる。

groupbyしやすいカラムを今回のDataFrameは持っていないので、Aの値の小数点1桁をIntを作って0〜9の値でグループが作れるようにする。

tenx_int_udf = F.udf(lambda x: int(x * 10), T.IntegerType())
tmp_df = df.withColumn('A_INT', tenx_int_udf('A'))
tmp_df.select('A', 'A_INT').take(5)

実行結果

[Row(A=0.5488135039273248, A_INT=5),
 Row(A=0.5448831829968969, A_INT=5),
 Row(A=0.4375872112626925, A_INT=4),
 Row(A=0.3834415188257777, A_INT=3),
 Row(A=0.5680445610939323, A_INT=5)]

pandas_udfを記述し、追加されたA_INTでgroup byしてapplyしてみる。

# Schemaの設定
schema = T.StructType([
    T.StructField('A', T.DoubleType()),
    T.StructField('B', T.DoubleType()),
    T.StructField('C', T.DoubleType()),
    T.StructField('A_INT', T.IntegerType())
])

# 各カラムのmeanを取って返す
@F.pandas_udf(returnType=schema, functionType=F.PandasUDFType.GROUPED_MAP)
def stats_udf(pdf):
    mean = pdf.mean()
    return pd.DataFrame(mean).T

# ソートして結果を表示
tmp_df = tmp_df.groupby('A_INT').apply(stats_udf)
tmp_df.orderBy('A_INT').collect()

実行結果

[Row(A=0.049572991832265344, B=0.48636115573217087, C=0.49684142546384646, A_INT=0),
 Row(A=0.1485495542178649, B=0.4879194026859281, C=0.48570094684084514, A_INT=1),
 Row(A=0.24968414766986902, B=0.4994563633088348, C=0.501692179056708, A_INT=2),
 Row(A=0.35009481926917224, B=0.5036896862925305, C=0.4890935979154168, A_INT=3),
 Row(A=0.4491004446873258, B=0.5048263261034505, C=0.4939256054116147, A_INT=4),
 Row(A=0.5500180340896127, B=0.4871415585766198, C=0.5038413218268272, A_INT=5),
 Row(A=0.651505854442605, B=0.4885796294445262, C=0.5044430265925124, A_INT=6),
 Row(A=0.7511341385968157, B=0.4977673756360947, C=0.505599177518435, A_INT=7),
 Row(A=0.8500983501029348, B=0.5003624963995137, C=0.4937047504740419, A_INT=8),
 Row(A=0.949296647455288, B=0.49326581500608757, C=0.49799167586781556, A_INT=9)]

上記ではスキーマを1つ1つ指定したが、簡易記法で記述することもできる。

schema = 'A double, B double, C double, A_INT int'
@F.pandas_udf(returnType=schema, functionType=F.PandasUDFType.GROUPED_MAP)
def stats_udf(pdf):
    mean = pdf.mean()
    return pd.DataFrame(mean).T

# ソートして結果を表示
tmp_df = tmp_df.groupby('A_INT').apply(stats_udf)
tmp_df.orderBy('A_INT').collect()

group byされたデータはDataFrameとして渡されるので、データ量によってはメモリに気をつけて処理を行わないといけない。どの程度の規模のデータになるかわからない処理については使いづらい。

GROUPED_AGG

GROUPED_MAPはgroupのDataFrameをudfに渡したが、GROUPED_AGGはgroupのschemaをDataFrameに渡して

pandasで書くとこういう差。

# GROUPED_MAP
pandas_df.groupby('A_INT').apply(f)

# GROUPED_AGG
pandas_df.groupby('A_INT').column_a.agg(f)

GROUPED_AGGでgroup byしたカラムAの平均を取ってみる。

tenx_int_udf = F.udf(lambda x: int(x * 10), T.IntegerType())
tmp_df = df.withColumn('A_INT', tenx_int_udf('A'))
tmp_df.select('A', 'A_INT').take(5)

@F.pandas_udf(returnType=T.DoubleType(), functionType=F.PandasUDFType.GROUPED_AGG)
def stats_udf(pdf):
    return pdf.mean()

# ソートして結果を表示
tmp_df.groupby('A_INT').agg(stats_udf('A')) \
    .collect()

実行結果

[Row(A_INT=1, stats_udf(A)=0.14854955421786484),
 Row(A_INT=6, stats_udf(A)=0.651505854442605),
 Row(A_INT=3, stats_udf(A)=0.3500948192691723),
 Row(A_INT=4, stats_udf(A)=0.4491004446873261),
 Row(A_INT=7, stats_udf(A)=0.7511341385968164),
 Row(A_INT=0, stats_udf(A)=0.04957299183226535),
 Row(A_INT=8, stats_udf(A)=0.8500983501029343),
 Row(A_INT=5, stats_udf(A)=0.5500180340896124),
 Row(A_INT=2, stats_udf(A)=0.24968414766986913),
 Row(A_INT=9, stats_udf(A)=0.9492966474552892)]

カラム名がudf呼び出しの記述になってしまった。

aliasで名前を指定しておく。

tmp_df.groupby('A_INT').agg(stats_udf('A').alias('A_MEAN')) \ 
    .collect() 

実行結果

[Row(A_INT=1, A_MEAN=0.14854955421786484),
 Row(A_INT=6, A_MEAN=0.651505854442605),
 Row(A_INT=3, A_MEAN=0.35009481926917224),
 Row(A_INT=4, A_MEAN=0.44910044468732613),
 Row(A_INT=7, A_MEAN=0.7511341385968165),
 Row(A_INT=0, A_MEAN=0.04957299183226535),
 Row(A_INT=8, A_MEAN=0.8500983501029343),
 Row(A_INT=5, A_MEAN=0.5500180340896124),
 Row(A_INT=2, A_MEAN=0.24968414766986913),
 Row(A_INT=9, A_MEAN=0.9492966474552892)]

改定履歴

Author: Masato Watanabe, Date: 2019-07-02, 記事投稿