iMind Developers Blog

iMind開発者ブログ

PySparkで複数のArrayカラムをconcatする

概要

Spark2.4から複数のArrayカラムをconcatできるようになったらしいので試してみる。

https://issues.apache.org/jira/browse/SPARK-23736

バージョン情報

  • Spark 2.4.3

サンプルデータ

サンプルデータとして、3つのArrayのカラム(名前はfoo, bar, baz)を持つDataFrameを用意する。

import pandas as pd
import numpy as np
np.random.seed(0)

# 0〜10までの値がランダムに入った配列データの生成
data = {
    'foo': np.random.randint(0, 10, (100, 3)).tolist(),
    'bar': np.random.randint(10, 20, (100, 3)).tolist(),
    'baz': np.random.randint(20, 30, (100, 3)).tolist()
}
pandas_df = pd.DataFrame(data, index=range(100))


# SparkのDataFrameに変換してtmp_exampleに保存
df = spark.createDataFrame(pandas_df)
df.registerTempTable('tmp_example')

# 生成データの確認
df.take(3)
    #=> [Row(foo=[2, 0, 4], bar=[17, 19, 18], baz=[27, 28, 22]),
    #=>  Row(foo=[5, 7, 3], bar=[10, 10, 10], baz=[24, 29, 27]),
    #=>  Row(foo=[6, 5, 5], bar=[12, 16, 18], baz=[22, 28, 22])]

concat

生成したデータに対して本題のconcatを使ってみる。

from pyspark.sql import functions as F

df.select(F.concat('foo', 'bar', 'baz').alias('new_column')).take(3)
    #=> [Row(new_column=[2, 0, 4, 17, 19, 18, 27, 28, 22]),
    #=>  Row(new_column=[5, 7, 3, 10, 10, 10, 24, 29, 27]),
    #=>  Row(new_column=[6, 5, 5, 12, 16, 18, 22, 28, 22])]

並び順にそのままにconcatされた。便利。

2.4より前のPySparkでconcat

2.4以前の場合は適当にudfでも書いて実行しておけば良いだろうか。

from functools import reduce

@F.udf(T.ArrayType(T.IntegerType()))
def concat_udf(*arr):
        return reduce(lambda x, y: x + y, arr)

df.select(concat_udf('foo', 'bar', 'baz').alias('new_column')).take(3)
    #=> [Row(new_column=[2, 0, 4, 17, 19, 18, 27, 28, 22]),
    #=>  Row(new_column=[5, 7, 3, 10, 10, 10, 24, 29, 27]),
    #=>  Row(new_column=[6, 5, 5, 12, 16, 18, 22, 28, 22])]

改定履歴

Author: Masato Watanabe, Date: 2019-08-24, 記事投稿