iMind Developers Blog

iMind開発者ブログ

PostgreSQL + Pythonでcopy from stdin with csv

概要

PostgreSQLで大量データを投入する際にcopy from stdin with csvを使った方が早いと聞いたので試す。

バージョン情報

  • Python 3.6
  • PostgreSQL 9.5.12
  • psycopg2==2.8.4

サンプルテーブル

下記のようなテーブルを用意する。

create table example (
    name varchar(50),
    age smallint,
    birthday timestamp
);

コマンドラインからの実行

下記のようなcsvファイルを用意する。ファイル名は example.csv とする。

川田,31,1989-10-11
横山,30,1990-03-15
沢村,35,1985-05-31

psqlクライアントから下記を実行する。

\COPY example
FROM 'example.csv' DELIMITER ',' CSV;

データが入ったか確認。

select * from example;

 name | age |      birthday       
------+-----+---------------------
 川田 |  31 | 1989-10-11 00:00:00
 横山 |  30 | 1990-03-15 00:00:00
 沢村 |  35 | 1985-05-31 00:00:00

上記はcsvファイルを用意してから実行したが、下記のように FROM stdin とすると標準入力で値を

\COPY example
FROM stdin DELIMITER ',' CSV;

コピーするデータに続いて改行を入力します。
バックスラッシュ(\)とピリオドだけの行で終了します。

>> 藤井,21,1998-12-10
>> \.

Psycopgでの実行

Psycopgにはcopy_fromというcopy用のメソッドが用意されている。

copy_from(file, table, sep='\t', null='\\N', size=8192, columns=None)

1つ目の引数にはfile-likeを指定する。

import psycopg2

conn = psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port="5432")
cur = conn.cursor()
with open('example.csv') as fp:
    cur.copy_from(fp, table='example', sep=',', columns=['name', 'age', 'birthday'])
conn.commit()

file-likeでいけるということは、io.StringIOでもいける。

import io
import psycopg2

csv_string = '''鹿島,25,1995-11-12
庄司,27,1993-06-06'''

conn = psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port="5432")
cur = conn.cursor()
sio = io.StringIO(csv_string)
cur.copy_from(sio, table='example', sep=',', columns=['name', 'age', 'birthday'])
conn.commit()

実行時間の比較

10万件のデータをINSERTして実行時間を比べてみる。

対抗相手はpandasのto_sqlの method=None(1つずつINSERT)、method=multi(複数行をまとめてINSERT)、今回のCOPYの3つで計測してみる。

まずは10万行データを適当に生成。

import datetime
import numpy as np
import pandas as pd

# ランダムで合成する適当な文字列を用意
hiragana = [chr(i) for i in range(12353, 12436)]
katakana = [chr(i) for i in range(12449, 12533)]
chars = hiragana + katakana

# 適当な10万件データの生成
df = pd.DataFrame(index=np.arange(100000), columns=[])
df['age'] = np.random.randint(0, 100, 100000)
df['name'] = df.age.apply(lambda x: ''.join(np.random.choice(chars, 10)))
df['birthday'] = df.age.apply(lambda x: datetime.datetime.now() - datetime.timedelta(days=x*365 + np.random.randint(0, 365)))

# こんな感じのデータができました
df.head()

    #=>    age        name                   birthday
    #=> 0   55  ョヨてやいみィトぞま 1964-11-23 23:09:48.125673
    #=> 1   24  コめたつレみネクえぱ 1995-11-10 23:09:48.125754
    #=> 2   10  ゴイぴぢキヴビユおや 2009-04-03 23:09:48.125775
    #=> 3   60  くギせウごゆぺヌいヰ 1959-11-04 23:09:48.125790
    #=> 4   15  ぶハしづしウふゅゾレ 2004-07-25 23:09:48.125804

to_sql(method=None) でデータ投入。

import sqlalchemy

engine = sqlalchemy.create_engine('postgresql://{user}@{host}:{port}/{db}'.format(
    user=DB_USER, host=DB_HOST, port=DB_PORT, db=DB_NAME))

%time df.to_sql('example', con=engine, if_exists='append', index=False)

    #=> CPU times: user 5.37 s, sys: 1.52 s, total: 6.89 s
    #=> Wall time: 27.5 s
import sqlalchemy

engine = sqlalchemy.create_engine('postgresql://{user}@{host}:{port}/{db}'.format(
    user=DB_USER, host=DB_HOST, port=DB_PORT, db=DB_NAME))

%time df.to_sql('example', con=engine, if_exists='append', index=False, method='multi')

    #=> CPU times: user 7.88 s, sys: 338 ms, total: 8.22 s
    #=> Wall time: 12.2 s
import io
import psycopg2

sio = io.StringIO()
df['birth_date'] = df.birthday.apply(lambda d: d.strftime('%Y-%m-%d'))
%time df.to_csv(sio, index=False, header=False, columns=['name', 'age', 'birth_date'])
    #=> CPU times: user 856 ms, sys: 24 ms, total: 880 ms
    #=> Wall time: 874 ms

print(sio.getvalue()[0:100])                                                                                                     
    #=> タヤッっずソノケワォ,49,1970-10-26
    #=> きくぷゎねバゑゥダミ,33,1986-04-25
    #=> ユるらヲヲッジバァろ,2,2017-08-16
    #=> ゴふやアそかソクシん,72,1947-08-11
    #=> ソ

%%time
with psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port="5432") as conn:
    cur = conn.cursor()
    sio.seek(0)
    cur.copy_from(sio, table='example', sep=',', columns=['name', 'age', 'birthday'])

    #=> CPU times: user 20.9 ms, sys: 9.92 ms, total: 30.8 ms
    #=> Wall time: 306 ms

to_sql でINSERTを発行したケースではmultiでも12秒かかっているのに対して、copy_fromでは306ミリ秒と圧倒的に速くなっていました。

SQLAlchemyでcopy

上記ではpsycopg2のcopy_fromを用いた。

SQLAlchemyから直接実行する場合は、cursor.copy_expertでSQLを実行すれば良いらしい。

import sqlalchemy

engine = sqlalchemy.create_engine('postgresql://{user}@{host}:{port}/{db}'.format(
    user=DB_USER, host=DB_HOST, port=DB_PORT, db=DB_NAME))

sql = """COPY example FROM stdin DELIMITER ',' CSV"""

sio = io.StringIO()
df['birth_date'] = df.birthday.apply(lambda d: d.strftime('%Y-%m-%d'))
df.to_csv(sio, index=False, header=False, columns=['name', 'age', 'birth_date'])

conn = engine.raw_connection()
cursor = conn.cursor()
sio.seek(0)
cursor.copy_expert(sql, sio)
conn.commit()
conn.close()

参考

https://github.com/jmcarp/sqlalchemy-postgres-copy/blob/master/postgres_copy/init.py

改定履歴

Author: Masato Watanabe, Date: 2020-03-07, 記事投稿