iMind Developers Blog

iMind開発者ブログ

airflow

GCPのCloud ComposerでHello World

概要 Airflowの運用をGCP任せにできるCloud Composerを使ってPythonOperatorやBashOperatorを叩いてみる。 バージョン情報 Google Cloud SDK 264.0.0 composer-1.7.5-airflow-1.10.2 起動 GCPのWebUIからcomposerを選択し、environmentを作成する。 各項目に…

Airflowのremoved stateのTaskを削除する

概要 Airflowでdagの実行中にタスクをソースコードから削除すると、stateがremovedになったタスクがTask Instancesのトップに表示されたままになることがある。 フィルタでState not contains removedとすれば消えるけど、邪魔なので根本的に削除する方法を…

AirflowのCheckOperatorでDBの値チェック

概要 AirflowのCheckOperatorを使ってテーブルのカウント等のチェックを行う。 バージョン情報 Python 3.6.7 apache-airflow==1.10.3 CheckOperatorの種類 name description CheckOperator 0やemptyをチェックする ValueCheckOperator 値が指定値と同じか、…

Airflowの日付マクロまとめ

概要 Airflowのテンプレートに日付を埋め込む時に毎回ググってる気がしてきたので、自分用のまとめを書いた。 バージョン情報 apache-airflow==1.10.3 参考URL https://airflow.apache.org/macros.html 確認用スクリプト contextの中にいるtsとかdsとかds_no…

AirflowのConnectionやVariableをCLIで投入する

概要 ConnectionやVariableの値はWeb UIから入れられるけど、実運用ではコマンドラインから構築できるようになっていた方が便利。 試しにConnectionとVariableにそれぞれ値を投入してみる。 バージョン情報 Python 3.6.7 apache-airflow==1.10.3 Connections…

AirflowでMySQL/PostgreSQLのconnectionの取得

概要 AirflowのConnectionsで設定したDBへの接続情報を利用して、Pythonのスクリプト上でDBへ接続する。 サービスで利用するDBの接続情報(ユーザー名/パスワード等)をAirflowで管理してしまおうという考え。 バージョン情報 Python 3.6.7 apache-airflow==…

AirflowでFailedのタスクをまとめてリトライ

概要 週明けに出社したらAirflowのTask Instancesが真っ赤になっていた。 そんな時、1つずつタスクをリトライしていくのは辛いのでコマンドでまとめてリトライする方法を確認する。 また、再実行する必要がない場合にまとめてSuccessにアップデートする方法…

Apache AirflowのDBをPostgreSQLに切り替える

概要 Apache AirflowのDBはデフォルトではSQLiteになっているが、これをPostgreSQLに切り替えてみる。 バージョン情報 Python 3.6.5 apache-airflow==1.10.1 psycopg2==2.7.5 上記が既にインストールされている前提で作業を進める。 DBとUserの生成 下記のSQ…

AirflowでWeb画面からDAGを消せなかった場合

概要 AirflowでWeb画面からDAGを削除しようとした場合に「DAG with id {DAG_ID} not found. Cannot delete」というエラーが出ることがある。 これの削除の仕方。 バージョン情報 Python 3.6.7 apache-airflow==1.10.1 CUIでの削除 CUIから「airflow delete_d…

Airflowのdags内でairflow.cfgの値を参照する

概要 $AIRFLOW_HOME/airflow.cfgファイルに記述した内容をPythonのコードから取りたかった。 バージョン情報 Python 3.6.7 apache-airflow==1.10.1 settingsを参照する AIRFLOW_HOMEやDAGS_FOLDERの値はairflow.settingsに入っている。 import airflow # air…

Airflowでpythonのコードをspark-submit

概要 AirflowのSparkSubmitOperatorを使ってPySparkのスクリプトファイルをspark-submitで実行する。 バージョン情報 Python 3.6.7 apache-airflow==1.10.1 spark 2.3.1 PySpark側のコード 適当にHDFS上のファイルを読み込んで行数をcountするコードを書いて…

AirflowのSSHOperatorを使う

概要 AirflowのSSHOperatorで指定のサーバーにsshしてコマンドを実行する。 バージョン情報 apache-airflow==1.10.2 Python 3.6.8 SSHOperatorの引数 SSHOperator実行時はこのへんのパラメータを指定する。 parameter description ssh_conn_id ConnectionのI…

Airflowでタスク失敗時にSlackへメッセージを送る

概要 Airflowのタスクが失敗した際にSlackにメッセージを送るようにする。 トークン等はVariablesに保存して扱う。 バージョン情報 Python 3.6.7 apache-airflow==1.10.1 slackclient==1.3.0 導入 slackclientが必要になるので入れておく。 $ pip install sl…

Apache AirflowでHello World

概要 Airflowはスケジューリングやワークフローが制御できるソフトウェア。毎日数十数百のバッチを動かしていて管理が煩雑と感じている人が使うと幸せになれる。 Pythonのスクリプトで記述できたりpipで手軽に入れられるところがPythonユーザー的には使いや…