iMind Developers Blog

iMind開発者ブログ

PySparkでRDDからDataFrame変換時に型を指定する

概要

PySparkで整形したrddをtoDFしようとしたら下記のようなエラーが起きて怒られた。

ValueError: Some of types cannot be determined by the first 100 rows,
please try again with sampling

データを確認すると、処理結果の上位がNoneになっていて型の推測に失敗して落ちてしまっていた。

これを解決する為に型を指定してRDDからDataFrameに変換する。

バージョン情報

  • spark-2.4.3
  • Python 3.7.3

通常のtoDF

型がわかるような値を指定してrddを生成し、toDFしてみる。

from pyspark.sql import Row

rdd1 = sc.parallelize([
    Row(col1='foo', col2=1),
    Row(col1='bar', col2=2)
])
df1 = rdd1.toDF()

df1.collect()
    #=> [Row(col1='foo', col2=1), Row(col1='bar', col2=2)]  

schemaを確認すると、col1がStringType, col2がLongTypeと指定されている。

df1.schema                                                                                                                   
    #=> StructType(List(
    #=>        StructField(col1,StringType,true),
    #=>        StructField(col2,LongType,true)))

このように推測可能な値が設定されていれば、toDFで自動でschemaが生成される。

エラーになるケース

rddの上位のサンプリング内が偶然Noneばかりで埋められてしまった場合などにエラーが起きる。

rdd2 = sc.parallelize([
    Row(col1=None, col2=1),
    Row(col1=None, col2=2)
])

rdd2.toDF()
    #=> ValueError: Some of types cannot be determined by the first 100 rows,
    #=> please try again with sampling

こうした場合はSchemaをちゃんと指定した上で、createDataFrameすればエラーにならない。

from pyspark.sql import types as T, functions as F

# スキーマの設定
schema = T.StructType([
    T.StructField('col1', T.StringType()),
    T.StructField('col2', T.LongType())
])

# 先ほどエラーになったrdd
rdd2 = sc.parallelize([
    Row(col1=None, col2=1),
    Row(col1=None, col2=2)
])

# schemaを指定してDataFrameに変換
df2 = spark.createDataFrame(rdd2, schema)
df2.collect()                                                                                                                
    #=> [Row(col1=None, col2=1), Row(col1=None, col2=2)] 

unionで怒られるケース

類似のケースで推測された型が違うのでunionに失敗するケースがある。

例えば下記は何かしらの都合で片方のカラムがBooleanTypeになっていて、StringTypeとunionしようとして失敗している。

# 2つ目のカラムをBooleanにしてDataFrameを生成する
schema = T.StructType([
    T.StructField('col1', T.StringType()),
    T.StructField('col2', T.BooleanType())
])
df3 = spark.createDataFrame(
    sc.parallelize([
        Row(col1='foo', col2=None),
        Row(col1='bar', col2=None)
    ]),
    schema
)

# 2つ目のカラムをStringにしてDataFrameを生成する
df4 = sc.parallelize([
    Row(col1='foo', col2='1'),
    Row(col1='bar', col2='2')
]).toDF()

# unionするとエラーになる
df = df3.union(df4)
    #=> Union can only be performed on tables with the compatible column types.
    #=> string <> boolean at the second column of the second table;

schemaに差異が出来てしまった場合は、castして合わせる。

df3 = df3.withColumn('col2', df3.col2.cast(T.StringType())) 
df = df3.union(df4)
df.collect()

実行結果

[Row(col1='foo', col2=None),
 Row(col1='bar', col2=None),
 Row(col1='foo', col2='1'),
 Row(col1='bar', col2='2')]

castできない場合はudfなどでcastする。下記は文字列を受け取り1ならTrue, 0ならFalseのBooleanに変換している。

df5 = sc.parallelize([
    Row(col1='foo', col2='1'),
    Row(col1='bar', col2='2')
]).toDF()

# 適当にbooleanに変換するudf
to_boolean = F.udf(lambda x: True if x == '1' else False, T.BooleanType())

# col2をBooleanにcastする
df5 = df5.withColumn('col2', to_boolean(df5.col2))
df5.collect()
    #=> [Row(col1='foo', col2=True), Row(col1='bar', col2=False)] 

# schemaの確認
df5.schema
    #=> StructType(List(
    #=>     StructField(col1,StringType,true),
    #=>     StructField(col2,BooleanType,true)))

改定履歴

Author: Masato Watanabe, Date: 2019-06-23, 記事投稿