iMind Developers Blog

iMind開発者ブログ

AirflowでMySQL/PostgreSQLのconnectionの取得

概要

AirflowのConnectionsで設定したDBへの接続情報を利用して、Pythonのスクリプト上でDBへ接続する。

サービスで利用するDBの接続情報(ユーザー名/パスワード等)をAirflowで管理してしまおうという考え。

バージョン情報

  • Python 3.6.7
  • apache-airflow==1.10.1

connectionsの設定

AirflowのWeb画面でConnectionのページを開く。

f:id:mwsoft:20190114194754p:plain

Createを選択してDBの情報を入力する。Conn Typeは利用しているDBを選ぶ。

f:id:mwsoft:20190114195101p:plain

Pythonでのconnectionの取得

connectionの取得方法は各種Operatorのソースコードを参照した。

MySQLならfrom airflow.hooks.mysql_hook import MySqlHook、PostgreSQLならfrom airflow.hooks.postgres_hook import PostgresHookを呼び出して、runすればいいらしい。

from airflow.hooks.postgres_hook import PostgresHook
hook = PostgresHook(postgres_conn_id='postgres_example')

engine = hook.get_sqlalchemy_engine()
conn = hook.get_conn()

内部的にはsqlalchemyを利用していて、get_sqlalchemy_engineでengineが、get_connでConnectionが取れる。

DB周りの処理はdbapi_hook.pyに書かれれている。

airflow/dbapi_hook.py at master · apache/airflow · GitHub

dbapi_hook.pyには他にも、psql.read_sqlを実行してくれるget_pandas_dfとか、cur.fetchallした結果を返すget_records、INSERT等を発行するrunなどが用意されているのでわざわざconnectionを取り出してどうこうすることもない。

dbapi_hookの機能確認

テスト用のデータを入れて各メソッドの動きを確認する。

CREATE TABLE  example(str varchar(255), i integer);
INSERT INTO example values ('foo', 1), ('bar', 2), ('baz', 3);

get_pandas_dfの例。

hook.get_pandas_df('select * from example')

結果

   str  i
0  foo  1
1  bar  2
2  baz  3

get_recordsの例。なんとなくparametersも指定。

hook.get_records('select * from example limit %s', parameters=('2'))

結果

 [('foo', 1), ('bar', 2)]

get_firstの例。最初のレコードを取得する。

hook.get_first('select count(*) from example')

結果

(3,)

runの例。試しに1行目のfooをhogeにupdateしてみる。

hook.run("update example set str='hoge' where str='foo'", autocommit=True)
hook.get_records('select * from example')

結果

[('bar', 2), ('baz', 3), ('hoge', 1)]

insert_rowsの例。テーブル名とvalueの一覧を引数で渡す。replace=Trueを設定するとINSERT INTOではなくREPLACE INTOが実行される。REPLACE INTOがないDBだと指定するとsyntax errorになる。

hook.insert_rows('example', [['fuga', 4], ['moge', 5]])
hook.get_records('select * from example')

結果

[('foo', 1), ('bar', 2), ('baz', 3), ('fuga', 4), ('moge', 5)]

MySQL/PostgreSQLのhookにはbulk_dump/bulk_loadも実装されている。

下記はPostgreSQLでbulk_dumpする例。

hook.bulk_dump('example', 'output.tsv')

結果(output.tsv)

foo  1
bar 2
baz 3
fuga    4
moge    5

改定履歴

Author: Masato Watanabe, Date: 2019-03-04, 記事投稿