概要
PySparkで整形したrddをtoDFしようとしたら下記のようなエラーが起きて怒られた。
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
データを確認すると、処理結果の上位がNoneになっていて型の推測に失敗して落ちてしまっていた。
これを解決する為に型を指定してRDDからDataFrameに変換する。
バージョン情報
- spark-2.4.3
- Python 3.7.3
通常のtoDF
型がわかるような値を指定してrddを生成し、toDFしてみる。
from pyspark.sql import Row rdd1 = sc.parallelize([ Row(col1='foo', col2=1), Row(col1='bar', col2=2) ]) df1 = rdd1.toDF() df1.collect() #=> [Row(col1='foo', col2=1), Row(col1='bar', col2=2)]
schemaを確認すると、col1がStringType, col2がLongTypeと指定されている。
df1.schema #=> StructType(List( #=> StructField(col1,StringType,true), #=> StructField(col2,LongType,true)))
このように推測可能な値が設定されていれば、toDFで自動でschemaが生成される。
エラーになるケース
rddの上位のサンプリング内が偶然Noneばかりで埋められてしまった場合などにエラーが起きる。
rdd2 = sc.parallelize([ Row(col1=None, col2=1), Row(col1=None, col2=2) ]) rdd2.toDF() #=> ValueError: Some of types cannot be determined by the first 100 rows, #=> please try again with sampling
こうした場合はSchemaをちゃんと指定した上で、createDataFrameすればエラーにならない。
from pyspark.sql import types as T, functions as F # スキーマの設定 schema = T.StructType([ T.StructField('col1', T.StringType()), T.StructField('col2', T.LongType()) ]) # 先ほどエラーになったrdd rdd2 = sc.parallelize([ Row(col1=None, col2=1), Row(col1=None, col2=2) ]) # schemaを指定してDataFrameに変換 df2 = spark.createDataFrame(rdd2, schema) df2.collect() #=> [Row(col1=None, col2=1), Row(col1=None, col2=2)]
unionで怒られるケース
類似のケースで推測された型が違うのでunionに失敗するケースがある。
例えば下記は何かしらの都合で片方のカラムがBooleanTypeになっていて、StringTypeとunionしようとして失敗している。
# 2つ目のカラムをBooleanにしてDataFrameを生成する schema = T.StructType([ T.StructField('col1', T.StringType()), T.StructField('col2', T.BooleanType()) ]) df3 = spark.createDataFrame( sc.parallelize([ Row(col1='foo', col2=None), Row(col1='bar', col2=None) ]), schema ) # 2つ目のカラムをStringにしてDataFrameを生成する df4 = sc.parallelize([ Row(col1='foo', col2='1'), Row(col1='bar', col2='2') ]).toDF() # unionするとエラーになる df = df3.union(df4) #=> Union can only be performed on tables with the compatible column types. #=> string <> boolean at the second column of the second table;
schemaに差異が出来てしまった場合は、castして合わせる。
df3 = df3.withColumn('col2', df3.col2.cast(T.StringType()))
df = df3.union(df4)
df.collect()
実行結果
[Row(col1='foo', col2=None), Row(col1='bar', col2=None), Row(col1='foo', col2='1'), Row(col1='bar', col2='2')]
castできない場合はudfなどでcastする。下記は文字列を受け取り1ならTrue, 0ならFalseのBooleanに変換している。
df5 = sc.parallelize([ Row(col1='foo', col2='1'), Row(col1='bar', col2='2') ]).toDF() # 適当にbooleanに変換するudf to_boolean = F.udf(lambda x: True if x == '1' else False, T.BooleanType()) # col2をBooleanにcastする df5 = df5.withColumn('col2', to_boolean(df5.col2)) df5.collect() #=> [Row(col1='foo', col2=True), Row(col1='bar', col2=False)] # schemaの確認 df5.schema #=> StructType(List( #=> StructField(col1,StringType,true), #=> StructField(col2,BooleanType,true)))
改定履歴
Author: Masato Watanabe, Date: 2019-06-23, 記事投稿