iMind Developers Blog

iMind開発者ブログ

PythonのfastavroでAvroの読み書き

概要

PythonでAvroのファイルを扱う。

avro-python3より速いと噂のfastavroを利用。

pandavroを使ったpandasとの連携も試す。

バージョン情報

  • Python3.7.3
  • fastavro==0.22.3
  • pandas==0.24.2
  • pandavro==1.5.0

インストール

$ pip install pandas fastavro pandavro

サンプルデータ

サンプルコードでは下記のスキーマを利用する。名前、年齢、性別が入るデータ構造。

{
  "namespace": "jp.imind.example.simple_users",
  "type": "record",
  "name": "user",
  "fields": [
      {"name": "name", "type": "string"},
      {"name": "age", "type": "integer"},
      {"name": "gender", "type": "integer"}
  ]
}

これを example.avsc という名前で保存しておく。

Avroファイルの保存

下記はdictionaryのlist形式のレコード情報を保存する処理。

import json
import fastavro

# スキーマの読込
with open('example.avsc', 'rt') as f:
    schema = json.load(f)
    schema = fastavro.parse_schema(schema)

# 出力するレコード
records = [{
        'name': 'tanaka',
        'age': 20,
        'gender': 1
}]

# Avroファイルの出力
with open('example.avro', 'wb') as f:
    fastavro.writer(
        f,
        schema,
        records
    )

ここではレコード追加時にファイルを「wb」で開いているが「ab+」などで開けば追記も可能。

fastavro.writerの実装は下記のようになっている。

https://github.com/fastavro/fastavro/blob/0.22.3/fastavro/_write.pyx#L551

fastavro.Writerを開いてflushするところまで自動でやっていることがわかる。

圧縮指定

writerのcodec引数にdeflateかsnappyを指定して圧縮することができる。

with open('example.avro', 'wb') as f:
    fastavro.writer(
        f,
        schema,
        records,
        codec='snappy'
    )

snappyをインストールする際はpython-snappyを入れておく。

condaで入れると手っ取り早い。

$ conda install python-snappy

1レコードずつwrite

fastavro.writerは出力ごとにflushしてしまうので、ループして1レコードずつ出力するような処理には向かない。

1レコードずつの出力をする場合はWriterを自前で定義して、最後にflushするような処理を書く。

with open('example.avro', 'wb') as f:
    writer = fastavro.write.Writer(f, schema, codec='snappy')
    for i in range(10000):
        # レコードの情報を生成
        record = {
                'name': 'tanaka',
                'age': i,
                'gender': i % 2 + 1
        }
        # 1レコードずつ追加
        writer.write(record)
    # 最後にflushする
    writer.flush()

上記の処理をもう少し簡易に書くために、適当なcontextmanagerでも作ってみる。

import fastavro
import contextlib

# withが終わったら自動でflushしたりcloseする処理
@contextlib.contextmanager
def open_fastavro(
        path,
        schema,
        mode='wb',
        codec='null',
        sync_interval=1000 * fastavro.read.SYNC_SIZE,
        metadata=None,
        validator=None,
        sync_marker=None):
    try:
        writer = None
        fo = open(path, mode=mode)
        writer = fastavro.write.Writer(
            fo,
            schema,
            codec,
            sync_interval,
            metadata,
            validator,
            sync_marker,
        )
        yield writer
    finally:
        if writer:
            writer.flush()
        fo.close()

これを使って書き込む。

with open_fastavro('example.avro', schema, codec='snappy') as writer:
    for i in range(100):
        # レコードの情報を生成
        record = {
                'name': 'tanaka',
                'age': i,
                'gender': i % 2 + 1
        }
        writer.write(record)

sync_intervalの指定

sync_intervalを指定すると、指定レコード数が書き込まれるたびにflushする。デフォルトは1万6千。

flush頻度が高すぎるとパフォーマンスにも圧縮サイズにも影響する。

例えばsync_intervalが100の場合と10万の場合で、100万レコードを出力してみる。

# 100万レコードを生成
records *= 1000000

# sync_interval=100
with open('example_1.avro', 'wb') as f:
    fastavro.writer(
        f,
        schema,
        records,
        sync_interval=100,
        codec='snappy'
    )

# sync_interval=10万
with open('example_2.avro', 'wb') as f:
    fastavro.writer(
        f,
        schema,
        records,
        sync_interval=100000,
        codec='snappy'
    )

パフォーマンスはこんな感じになる(10回計測した平均)。

sync_interval 実行時間 ファイルサイズ
100 1.34 sec 3.1M
10万 1.08 sec 417K

この例では同じレコードを100万回保存しているので過剰に圧縮効率が良くなっているが、ブロックサイズが大きい方が圧縮を効かせやすいのは確かなところ。

Avroファイルの読み込み

1レコードずつループしながら読み込む例。

import fastavro
with open('example.avro', 'rb') as f:
    for record in fastavro.reader(f):
        print(record)

# 実行結果
#=> {'name': 'tanaka', 'age': 20, 'gender': 1}

pandavroでのreadとwrite

read

import pandavro

df = pandavro.read_avro('example.avro')

write

pandavro.to_avro(
    'example3.avro', df, codec='snappy', append=True)

pandavro.to_avro(v1.5.0)のソースコードは下記。

https://github.com/ynqa/pandavro/blob/v1.5.0/pandavro/__init__.py#L126

既存の引数で不足を感じた場合は、上記のコードを参考にやりたい記述を書けば楽そう(MIT License)。

改定履歴

Author: Masato Watanabe, Date: 2019-08-05, 記事投稿