iMind Developers Blog

iMind開発者ブログ

Apache AirflowでHello World

概要

Airflowはスケジューリングやワークフローが制御できるソフトウェア。毎日数十数百のバッチを動かしていて管理が煩雑と感じている人が使うと幸せになれる。

Pythonのスクリプトで記述できたりpipで手軽に入れられるところがPythonユーザー的には使いやすい。

今回は導入からhello worldとログに残すだけの簡易な記述を動かすところまでを実行する。

バージョン情報

  • Python 3.6.7
  • apache-airflow==1.10.2

導入

他のPython実行環境とわけた方がいいケースが多いので、Airflow用のcondaのenvを作ってそこに入れてみる。

$ conda create -n airflow
$ source activate airflow
$ pip install apache-airflow

が、こんなエラーが出た。

RuntimeError: By default one of Airflow's dependencies installs a GPL dependency (unidecode). To avoid this dependency set SLUGIFY_USES_TEXT_UNIDECODE=yes in your environment when you install or upgrade Airflow. To force installing the GPL version set AIRFLOW_GPL_UNIDECODE

airflow自体はApacheライセンスだけどslugifyが使っているunidecodeがGPLだけどどうするべ、みたいなことが聞かれている。気にしないので下記でyesを指定してインストール。

AIRFLOW_GPL_UNIDECODE=yes pip install apache-airflow

下記公式サイトの手順に従ってざっくり動かしてみる

https://airflow.apache.org/start.html

$ airflow initdb
$ airflow webserver -p 8080

これで http://localhost:8080/ にリクエストするとAirflowの画面が確認できる。いろいろなサンプルが入っているので内容を確認していくと使い方がなんとなくわかる。

デフォルトではWebサーバーはFlask、DBはSQLiteで 動いていて ~/airflow がAIRFLOW_HOMEとして扱われる。AIRFLOW_HOME配下にはSQLiteのdbファイルや各種設定ファイルが配置される。

hello world

さっそくdagを記述してAirflowでhello worldしてみる。

自動生成された $AIRFLOW_HOME//airflow.cfg によるとdagは $AIRFLOW_HOME/dags に配置することになっている。

dags_folder = /home/user/airflow/dags

ちなみにairflow.cfgのパス指定はこう書いても大丈夫。

dags_folder = ~/airflow/dags

当該フォルダを作る

$ cd ~/airflow
$ mkdir dags
$ cd dags

dags配下に下記の内容を記述した hello_dag.py というファイルを置く。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import datetime, logging

# 今回呼び出すhello world関数
def hello():
    logging.info('hello world')

# 同goodbye関数
def goodbye():
    logging.info('goodbye world')

# 必須項目のownerとstart_dateを埋める。start_dateはスケジュールの開始日時。現在に近い日時を入れておく。
args = {
    'owner': 'masato watanabe',
    'start_date': datetime.datetime(2018, 12, 1)
}

# DAGの設定。schedule_intervalはcron形式で書ける。ここでは毎日0時を設定。
dag = DAG('hello_dag', default_args=args, schedule_interval='0 0 * * *')

# hello world用のPythonOperator
hello = PythonOperator(
    task_id='hello',
    python_callable=hello,
    dag=dag
)

# 同goodbye用のOperator
goodbye = PythonOperator(
    task_id='goodbye',
    python_callable=goodbye,
    dag=dag
)

# hello, goodbyeの実行
hello >> goodbye

helloの方をrunしてみる。

airflow runでdag_id、task_id、日時を指定して実行する。

$ airflow run hello_dag hello 2018-12-03T00:00

実行すると下記にログができている。

../logs/hello_dag/hello/2018-12-03T00\:00\:00+00\:00/1.log

ログの中にはlogging.infoした情報と、Return Value(何も返してないのでNone)が記載されている。

[2018-12-12 13:16:34,228] {logging_mixin.py:95} INFO - [2018-12-12 13:16:34,227] {hello_dag.py:6} INFO - hello world
[2018-12-12 13:16:34,228] {python_operator.py:96} INFO - Done. Returned value was: None

goodbyeの方も実行。

$ airflow run hello_dag goodbye 2018-12-03T00:00

hello_dag/goodbyeの方にログが出力される。

 ../logs/hello_dag/goodbye/2018-12-03T00\:00\:00+00\:00/1.log
[2018-12-12 13:17:47,748] {logging_mixin.py:95} INFO - [2018-12-12 13:17:47,748] {hello_dag.py:9} INFO - goodbye world
[2018-12-12 13:17:47,748] {python_operator.py:96} INFO - Done. Returned value was: None

schedulerからの実行

Webサーバーを立ち上げる。

$ airflow webserver -p 8080

画面にhello_dagが追加されている。但しスケジューラーには登録されていないので「This DAG seems to be eisiting only locally. The master scheduler doesn't seem to be aware of its existene.(DAGがローカルにあるみたいだけどスケジューラーはまだ気づいてないよ)」というメッセージが表示される。

f:id:imind:20181213183040p:plain

スケジューラーを起動してみる。

$ airflow scheduler

実行してしばし待つとスケジューラーが自動で新規に登録されたdagを認識してくれる。

Graph Viewで見るとこんな感じでhello, goodbyeがシーケンシャルに実行されるようになっている。

f:id:imind:20181213195947p:plain

登録直後は画面上でOffが設定されているのでOnに変えてschedule側の実行を待つ。

今回はcronの設定を毎分にしたので$AIRFLOW_HOME/logs配下にstart_dateで指定された日時から現在までの毎分の実行が行われてログがどんどん増えていく。

Browse → Task Instancesを見ると各タスクの実行結果が表示される。

f:id:imind:20181213200400p:plain

一番上のfiledは例としてhello関数にraise Exceptionを書いて発生させた。

今回はPythonOperatorでPythonのコードを実行させたが、BashOperatorにコマンドを記載することで各種コードを呼び出すこともできる。

改定履歴

Author: Masato Watanabe, Date: 2019-01-29, 記事投稿