概要
PythonでAvroのファイルを扱う。
avro-python3より速いと噂のfastavroを利用。
pandavroを使ったpandasとの連携も試す。
バージョン情報
- Python3.7.3
- fastavro==0.22.3
- pandas==0.24.2
- pandavro==1.5.0
インストール
$ pip install pandas fastavro pandavro
サンプルデータ
サンプルコードでは下記のスキーマを利用する。名前、年齢、性別が入るデータ構造。
{ "namespace": "jp.imind.example.simple_users", "type": "record", "name": "user", "fields": [ {"name": "name", "type": "string"}, {"name": "age", "type": "integer"}, {"name": "gender", "type": "integer"} ] }
これを example.avsc という名前で保存しておく。
Avroファイルの保存
下記はdictionaryのlist形式のレコード情報を保存する処理。
import json import fastavro # スキーマの読込 with open('example.avsc', 'rt') as f: schema = json.load(f) schema = fastavro.parse_schema(schema) # 出力するレコード records = [{ 'name': 'tanaka', 'age': 20, 'gender': 1 }] # Avroファイルの出力 with open('example.avro', 'wb') as f: fastavro.writer( f, schema, records )
ここではレコード追加時にファイルを「wb」で開いているが「ab+」などで開けば追記も可能。
fastavro.writerの実装は下記のようになっている。
https://github.com/fastavro/fastavro/blob/0.22.3/fastavro/_write.pyx#L551
fastavro.Writerを開いてflushするところまで自動でやっていることがわかる。
圧縮指定
writerのcodec引数にdeflateかsnappyを指定して圧縮することができる。
with open('example.avro', 'wb') as f: fastavro.writer( f, schema, records, codec='snappy' )
snappyをインストールする際はpython-snappyを入れておく。
condaで入れると手っ取り早い。
$ conda install python-snappy
1レコードずつwrite
fastavro.writerは出力ごとにflushしてしまうので、ループして1レコードずつ出力するような処理には向かない。
1レコードずつの出力をする場合はWriterを自前で定義して、最後にflushするような処理を書く。
with open('example.avro', 'wb') as f: writer = fastavro.write.Writer(f, schema, codec='snappy') for i in range(10000): # レコードの情報を生成 record = { 'name': 'tanaka', 'age': i, 'gender': i % 2 + 1 } # 1レコードずつ追加 writer.write(record) # 最後にflushする writer.flush()
上記の処理をもう少し簡易に書くために、適当なcontextmanagerでも作ってみる。
import fastavro import contextlib # withが終わったら自動でflushしたりcloseする処理 @contextlib.contextmanager def open_fastavro( path, schema, mode='wb', codec='null', sync_interval=1000 * fastavro.read.SYNC_SIZE, metadata=None, validator=None, sync_marker=None): try: writer = None fo = open(path, mode=mode) writer = fastavro.write.Writer( fo, schema, codec, sync_interval, metadata, validator, sync_marker, ) yield writer finally: if writer: writer.flush() fo.close()
これを使って書き込む。
with open_fastavro('example.avro', schema, codec='snappy') as writer: for i in range(100): # レコードの情報を生成 record = { 'name': 'tanaka', 'age': i, 'gender': i % 2 + 1 } writer.write(record)
sync_intervalの指定
sync_intervalを指定すると、指定レコード数が書き込まれるたびにflushする。デフォルトは1万6千。
flush頻度が高すぎるとパフォーマンスにも圧縮サイズにも影響する。
例えばsync_intervalが100の場合と10万の場合で、100万レコードを出力してみる。
# 100万レコードを生成 records *= 1000000 # sync_interval=100 with open('example_1.avro', 'wb') as f: fastavro.writer( f, schema, records, sync_interval=100, codec='snappy' ) # sync_interval=10万 with open('example_2.avro', 'wb') as f: fastavro.writer( f, schema, records, sync_interval=100000, codec='snappy' )
パフォーマンスはこんな感じになる(10回計測した平均)。
sync_interval | 実行時間 | ファイルサイズ |
---|---|---|
100 | 1.34 sec | 3.1M |
10万 | 1.08 sec | 417K |
この例では同じレコードを100万回保存しているので過剰に圧縮効率が良くなっているが、ブロックサイズが大きい方が圧縮を効かせやすいのは確かなところ。
Avroファイルの読み込み
1レコードずつループしながら読み込む例。
import fastavro with open('example.avro', 'rb') as f: for record in fastavro.reader(f): print(record) # 実行結果 #=> {'name': 'tanaka', 'age': 20, 'gender': 1}
pandavroでのreadとwrite
read
import pandavro df = pandavro.read_avro('example.avro')
write
pandavro.to_avro( 'example3.avro', df, codec='snappy', append=True)
pandavro.to_avro(v1.5.0)のソースコードは下記。
https://github.com/ynqa/pandavro/blob/v1.5.0/pandavro/__init__.py#L126
既存の引数で不足を感じた場合は、上記のコードを参考にやりたい記述を書けば楽そう(MIT License)。
改定履歴
Author: Masato Watanabe, Date: 2019-08-05, 記事投稿