概要
AirflowのConnectionsで設定したDBへの接続情報を利用して、Pythonのスクリプト上でDBへ接続する。
サービスで利用するDBの接続情報(ユーザー名/パスワード等)をAirflowで管理してしまおうという考え。
バージョン情報
- Python 3.6.7
- apache-airflow==1.10.1
connectionsの設定
AirflowのWeb画面でConnectionのページを開く。
Createを選択してDBの情報を入力する。Conn Typeは利用しているDBを選ぶ。
Pythonでのconnectionの取得
connectionの取得方法は各種Operatorのソースコードを参照した。
- airflow/mysql_operator.py at master · apache/airflow · GitHub
- airflow/postgres_operator.py at master · apache/airflow · GitHub
MySQLならfrom airflow.hooks.mysql_hook import MySqlHook、PostgreSQLならfrom airflow.hooks.postgres_hook import PostgresHookを呼び出して、runすればいいらしい。
from airflow.hooks.postgres_hook import PostgresHook hook = PostgresHook(postgres_conn_id='postgres_example') engine = hook.get_sqlalchemy_engine() conn = hook.get_conn()
内部的にはsqlalchemyを利用していて、get_sqlalchemy_engineでengineが、get_connでConnectionが取れる。
DB周りの処理はdbapi_hook.pyに書かれれている。
airflow/dbapi_hook.py at master · apache/airflow · GitHub
dbapi_hook.pyには他にも、psql.read_sqlを実行してくれるget_pandas_dfとか、cur.fetchallした結果を返すget_records、INSERT等を発行するrunなどが用意されているのでわざわざconnectionを取り出してどうこうすることもない。
dbapi_hookの機能確認
テスト用のデータを入れて各メソッドの動きを確認する。
CREATE TABLE example(str varchar(255), i integer); INSERT INTO example values ('foo', 1), ('bar', 2), ('baz', 3);
get_pandas_dfの例。
hook.get_pandas_df('select * from example')
結果
str i 0 foo 1 1 bar 2 2 baz 3
get_recordsの例。なんとなくparametersも指定。
hook.get_records('select * from example limit %s', parameters=('2'))
結果
[('foo', 1), ('bar', 2)]
get_firstの例。最初のレコードを取得する。
hook.get_first('select count(*) from example')
結果
(3,)
runの例。試しに1行目のfooをhogeにupdateしてみる。
hook.run("update example set str='hoge' where str='foo'", autocommit=True) hook.get_records('select * from example')
結果
[('bar', 2), ('baz', 3), ('hoge', 1)]
insert_rowsの例。テーブル名とvalueの一覧を引数で渡す。replace=Trueを設定するとINSERT INTOではなくREPLACE INTOが実行される。REPLACE INTOがないDBだと指定するとsyntax errorになる。
hook.insert_rows('example', [['fuga', 4], ['moge', 5]]) hook.get_records('select * from example')
結果
[('foo', 1), ('bar', 2), ('baz', 3), ('fuga', 4), ('moge', 5)]
MySQL/PostgreSQLのhookにはbulk_dump/bulk_loadも実装されている。
下記はPostgreSQLでbulk_dumpする例。
hook.bulk_dump('example', 'output.tsv')
結果(output.tsv)
foo 1 bar 2 baz 3 fuga 4 moge 5
改定履歴
Author: Masato Watanabe, Date: 2019-03-04, 記事投稿