概要
PySparkでhiveのテーブルのpartitionから最新のものを取りたかった。
何か良い方法を前に見かけたような気がするのだけど思い出せないのでshow partitionsで取れた文字列をパースする。
バージョン情報
- Spark 2.4.3
テーブル生成
partitionカラムとしてbazとbarの2つを持つfooというテーブルを作成する。
spark.sql(""" create table tmp.foo ( foo string ) partitioned by ( bar string, baz string ) stored as sequencefile """)
データ投入
作成したテーブルにデータを投入してみる。insertIntoを使うと勝手にDynamicにPartitionを定めてくれる。
from pyspark.sql import Row # 投入するデータの生成 df = spark.createDataFrame([ Row(foo='foo1', bar='bar1', baz='baz1'), Row(foo='foo2', bar='bar2', baz='baz2'), ]) df.collect() #=> [Row(bar='bar1', baz='baz1', foo='foo1'), #=> Row(bar='bar2', baz='baz2', foo='foo2')] # insert df.write.insertInto('tmp.foo')
パーティション確認
show partitionsを実行して結果をpandasで取得。
partitions = spark.sql('show partitions tmp.foo').toPandas() #=> partition #=> 0 bar=baz1/baz=foo1 #=> 1 bar=baz2/baz=foo2
文字列をパースして list(dict) に変換してみる。
def split_partition(s): partitions = s.split('/') return dict([p.split('=') for p in partitions]) list(map(split_partition, partitions.partition)) #=> [{'bar': 'baz1', 'baz': 'foo1'}, {'bar': 'baz2', 'baz': 'foo2'}]
パーティションの文字列にイコールやスラッシュが入った場合はどうなるか。
df = spark.createDataFrame([ Row(foo='foo1', bar='bar=1', baz='baz1'), ]) df.collect() #=> [Row(bar='bar=1', baz='baz1', foo='foo1')] df.write.insertInto('tmp.foo') partitions = spark.sql('show partitions tmp.foo').toPandas() #=> partition #=> 0 bar=baz1/baz=foo1
あれ? bar='bar=1' を入れたのに bar=baz1がpartitionになっている。。。
なんか怪しいので見なかったことにする。
改定履歴
Author: Masato Watanabe, Date: 2019-09-02, 記事投稿