概要
PostgreSQLで大量データを投入する際にcopy from stdin with csvを使った方が早いと聞いたので試す。
バージョン情報
- Python 3.6
- PostgreSQL 9.5.12
- psycopg2==2.8.4
サンプルテーブル
下記のようなテーブルを用意する。
create table example ( name varchar(50), age smallint, birthday timestamp );
コマンドラインからの実行
下記のようなcsvファイルを用意する。ファイル名は example.csv とする。
川田,31,1989-10-11 横山,30,1990-03-15 沢村,35,1985-05-31
psqlクライアントから下記を実行する。
\COPY example FROM 'example.csv' DELIMITER ',' CSV;
データが入ったか確認。
select * from example; name | age | birthday ------+-----+--------------------- 川田 | 31 | 1989-10-11 00:00:00 横山 | 30 | 1990-03-15 00:00:00 沢村 | 35 | 1985-05-31 00:00:00
上記はcsvファイルを用意してから実行したが、下記のように FROM stdin とすると標準入力で値を
\COPY example FROM stdin DELIMITER ',' CSV; コピーするデータに続いて改行を入力します。 バックスラッシュ(\)とピリオドだけの行で終了します。 >> 藤井,21,1998-12-10 >> \.
Psycopgでの実行
Psycopgにはcopy_fromというcopy用のメソッドが用意されている。
copy_from(file, table, sep='\t', null='\\N', size=8192, columns=None)
1つ目の引数にはfile-likeを指定する。
import psycopg2 conn = psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port="5432") cur = conn.cursor() with open('example.csv') as fp: cur.copy_from(fp, table='example', sep=',', columns=['name', 'age', 'birthday']) conn.commit()
file-likeでいけるということは、io.StringIOでもいける。
import io import psycopg2 csv_string = '''鹿島,25,1995-11-12 庄司,27,1993-06-06''' conn = psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port="5432") cur = conn.cursor() sio = io.StringIO(csv_string) cur.copy_from(sio, table='example', sep=',', columns=['name', 'age', 'birthday']) conn.commit()
実行時間の比較
10万件のデータをINSERTして実行時間を比べてみる。
対抗相手はpandasのto_sqlの method=None(1つずつINSERT)、method=multi(複数行をまとめてINSERT)、今回のCOPYの3つで計測してみる。
まずは10万行データを適当に生成。
import datetime import numpy as np import pandas as pd # ランダムで合成する適当な文字列を用意 hiragana = [chr(i) for i in range(12353, 12436)] katakana = [chr(i) for i in range(12449, 12533)] chars = hiragana + katakana # 適当な10万件データの生成 df = pd.DataFrame(index=np.arange(100000), columns=[]) df['age'] = np.random.randint(0, 100, 100000) df['name'] = df.age.apply(lambda x: ''.join(np.random.choice(chars, 10))) df['birthday'] = df.age.apply(lambda x: datetime.datetime.now() - datetime.timedelta(days=x*365 + np.random.randint(0, 365))) # こんな感じのデータができました df.head() #=> age name birthday #=> 0 55 ョヨてやいみィトぞま 1964-11-23 23:09:48.125673 #=> 1 24 コめたつレみネクえぱ 1995-11-10 23:09:48.125754 #=> 2 10 ゴイぴぢキヴビユおや 2009-04-03 23:09:48.125775 #=> 3 60 くギせウごゆぺヌいヰ 1959-11-04 23:09:48.125790 #=> 4 15 ぶハしづしウふゅゾレ 2004-07-25 23:09:48.125804
to_sql(method=None) でデータ投入。
import sqlalchemy engine = sqlalchemy.create_engine('postgresql://{user}@{host}:{port}/{db}'.format( user=DB_USER, host=DB_HOST, port=DB_PORT, db=DB_NAME)) %time df.to_sql('example', con=engine, if_exists='append', index=False) #=> CPU times: user 5.37 s, sys: 1.52 s, total: 6.89 s #=> Wall time: 27.5 s
import sqlalchemy engine = sqlalchemy.create_engine('postgresql://{user}@{host}:{port}/{db}'.format( user=DB_USER, host=DB_HOST, port=DB_PORT, db=DB_NAME)) %time df.to_sql('example', con=engine, if_exists='append', index=False, method='multi') #=> CPU times: user 7.88 s, sys: 338 ms, total: 8.22 s #=> Wall time: 12.2 s
import io import psycopg2 sio = io.StringIO() df['birth_date'] = df.birthday.apply(lambda d: d.strftime('%Y-%m-%d')) %time df.to_csv(sio, index=False, header=False, columns=['name', 'age', 'birth_date']) #=> CPU times: user 856 ms, sys: 24 ms, total: 880 ms #=> Wall time: 874 ms print(sio.getvalue()[0:100]) #=> タヤッっずソノケワォ,49,1970-10-26 #=> きくぷゎねバゑゥダミ,33,1986-04-25 #=> ユるらヲヲッジバァろ,2,2017-08-16 #=> ゴふやアそかソクシん,72,1947-08-11 #=> ソ %%time with psycopg2.connect(database=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port="5432") as conn: cur = conn.cursor() sio.seek(0) cur.copy_from(sio, table='example', sep=',', columns=['name', 'age', 'birthday']) #=> CPU times: user 20.9 ms, sys: 9.92 ms, total: 30.8 ms #=> Wall time: 306 ms
to_sql でINSERTを発行したケースではmultiでも12秒かかっているのに対して、copy_fromでは306ミリ秒と圧倒的に速くなっていました。
SQLAlchemyでcopy
上記ではpsycopg2のcopy_fromを用いた。
SQLAlchemyから直接実行する場合は、cursor.copy_expertでSQLを実行すれば良いらしい。
import sqlalchemy engine = sqlalchemy.create_engine('postgresql://{user}@{host}:{port}/{db}'.format( user=DB_USER, host=DB_HOST, port=DB_PORT, db=DB_NAME)) sql = """COPY example FROM stdin DELIMITER ',' CSV""" sio = io.StringIO() df['birth_date'] = df.birthday.apply(lambda d: d.strftime('%Y-%m-%d')) df.to_csv(sio, index=False, header=False, columns=['name', 'age', 'birth_date']) conn = engine.raw_connection() cursor = conn.cursor() sio.seek(0) cursor.copy_expert(sql, sio) conn.commit() conn.close()
参考
https://github.com/jmcarp/sqlalchemy-postgres-copy/blob/master/postgres_copy/init.py
改定履歴
Author: Masato Watanabe, Date: 2020-03-07, 記事投稿