概要
Airflowはスケジューリングやワークフローが制御できるソフトウェア。毎日数十数百のバッチを動かしていて管理が煩雑と感じている人が使うと幸せになれる。
Pythonのスクリプトで記述できたりpipで手軽に入れられるところがPythonユーザー的には使いやすい。
今回は導入からhello worldとログに残すだけの簡易な記述を動かすところまでを実行する。
バージョン情報
- Python 3.6.7
- apache-airflow==1.10.2
導入
他のPython実行環境とわけた方がいいケースが多いので、Airflow用のcondaのenvを作ってそこに入れてみる。
$ conda create -n airflow $ source activate airflow $ pip install apache-airflow
が、こんなエラーが出た。
RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE
airflow自体はApacheライセンスだけどslugifyが使っているunidecodeがGPLだけどどうするべ、みたいなことが聞かれている。気にしないので下記でyesを指定してインストール。
AIRFLOW_GPL_UNIDECODE=yes pip install apache-airflow
下記公式サイトの手順に従ってざっくり動かしてみる
https://airflow.apache.org/start.html
$ airflow initdb $ airflow webserver -p 8080
これで http://localhost:8080/ にリクエストするとAirflowの画面が確認できる。いろいろなサンプルが入っているので内容を確認していくと使い方がなんとなくわかる。
デフォルトではWebサーバーはFlask、DBはSQLiteで 動いていて ~/airflow がAIRFLOW_HOMEとして扱われる。AIRFLOW_HOME配下にはSQLiteのdbファイルや各種設定ファイルが配置される。
hello world
さっそくdagを記述してAirflowでhello worldしてみる。
自動生成された $AIRFLOW_HOME//airflow.cfg によるとdagは $AIRFLOW_HOME/dags に配置することになっている。
dags_folder = /home/user/airflow/dags
ちなみにairflow.cfgのパス指定はこう書いても大丈夫。
dags_folder = ~/airflow/dags
当該フォルダを作る
$ cd ~/airflow $ mkdir dags $ cd dags
dags配下に下記の内容を記述した hello_dag.py というファイルを置く。
from airflow import DAG from airflow.operators.python_operator import PythonOperator import datetime, logging # 今回呼び出すhello world関数 def hello(): logging.info('hello world') # 同goodbye関数 def goodbye(): logging.info('goodbye world') # 必須項目のownerとstart_dateを埋める。start_dateはスケジュールの開始日時。現在に近い日時を入れておく。 args = { 'owner': 'masato watanabe', 'start_date': datetime.datetime(2018, 12, 1) } # DAGの設定。schedule_intervalはcron形式で書ける。ここでは毎日0時を設定。 dag = DAG('hello_dag', default_args=args, schedule_interval='0 0 * * *') # hello world用のPythonOperator hello = PythonOperator( task_id='hello', python_callable=hello, dag=dag ) # 同goodbye用のOperator goodbye = PythonOperator( task_id='goodbye', python_callable=goodbye, dag=dag ) # hello, goodbyeの実行 hello >> goodbye
helloの方をrunしてみる。
airflow runでdag_id、task_id、日時を指定して実行する。
$ airflow run hello_dag hello 2018-12-03T00:00
実行すると下記にログができている。
../logs/hello_dag/hello/2018-12-03T00\:00\:00+00\:00/1.log
ログの中にはlogging.infoした情報と、Return Value(何も返してないのでNone)が記載されている。
[2018-12-12 13:16:34,228] {logging_mixin.py:95} INFO - [2018-12-12 13:16:34,227] {hello_dag.py:6} INFO - hello world [2018-12-12 13:16:34,228] {python_operator.py:96} INFO - Done. Returned value was: None
goodbyeの方も実行。
$ airflow run hello_dag goodbye 2018-12-03T00:00
hello_dag/goodbyeの方にログが出力される。
../logs/hello_dag/goodbye/2018-12-03T00\:00\:00+00\:00/1.log
[2018-12-12 13:17:47,748] {logging_mixin.py:95} INFO - [2018-12-12 13:17:47,748] {hello_dag.py:9} INFO - goodbye world [2018-12-12 13:17:47,748] {python_operator.py:96} INFO - Done. Returned value was: None
schedulerからの実行
Webサーバーを立ち上げる。
$ airflow webserver -p 8080
画面にhello_dagが追加されている。但しスケジューラーには登録されていないので「This DAG seems to be eisiting only locally. The master scheduler doesn't seem to be aware of its existene.(DAGがローカルにあるみたいだけどスケジューラーはまだ気づいてないよ)」というメッセージが表示される。
スケジューラーを起動してみる。
$ airflow scheduler
実行してしばし待つとスケジューラーが自動で新規に登録されたdagを認識してくれる。
Graph Viewで見るとこんな感じでhello, goodbyeがシーケンシャルに実行されるようになっている。
登録直後は画面上でOffが設定されているのでOnに変えてschedule側の実行を待つ。
今回はcronの設定を毎分にしたので$AIRFLOW_HOME/logs配下にstart_dateで指定された日時から現在までの毎分の実行が行われてログがどんどん増えていく。
Browse → Task Instancesを見ると各タスクの実行結果が表示される。
一番上のfiledは例としてhello関数にraise Exceptionを書いて発生させた。
今回はPythonOperatorでPythonのコードを実行させたが、BashOperatorにコマンドを記載することで各種コードを呼び出すこともできる。
改定履歴
Author: Masato Watanabe, Date: 2019-01-29, 記事投稿