概要
Airflowの運用をGCP任せにできるCloud Composerを使ってPythonOperatorやBashOperatorを叩いてみる。
バージョン情報
- Google Cloud SDK 264.0.0
- composer-1.7.5-airflow-1.10.2
起動
GCPのWebUIからcomposerを選択し、environmentを作成する。
各項目には下記を入力した。
項目 | 値 | 補足 |
---|---|---|
Name | example | |
Node count | 3 | |
Location | asia-northeast1 | |
Zone | asia-northeast1-b | どれでもいい |
Machine type | n1-standard-1 | Workerのタイプ |
Disk size(GB) | 20 | |
Image version | composer-1.7.5-airflow-1.10.2 | |
Python version | 3 |
これで立ち上げるとComposerが用意される。(立ち上がるのにけっこう時間がかかった)
Airflow webserverからAirflowのWebUIが表示できる。またDAGsからdagファイルの置かれているCloud Storageへのリンクが表示される。
Composerの料金
2019年10月時点で利用料金例ではアイオワリージョンで$76.06と想定されている。東京だと20%強上がるので月1万円近く見込む必要がある。けっこう高い。
https://cloud.google.com/composer/pricing?hl=ja
試した後はちゃんと止めておこう。
Hello World PythonOperator
まずはPythonOperatorで毎分hello worldとprintするだけのDAGを登録してみる。
Composerの画面からDAGsを選ぶとCloud Strageが開く。開いたパスにはairflow_monitoring.pyというDGAが記述されたファイル(中身はBashOperatorでechoしてるだけ)が置いてある。
バケットは自動生成された新規のものが使われている。
ここに適当なDAGを置けばAirflowが読み込んでくれる。
試しに下記のようなDAGをexample_dag.pyという名前で保存する。
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import timedelta default_args = { 'start_date': airflow.utils.dates.days_ago(0), 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'hello_world', default_args=default_args, schedule_interval='0 * * * *') def hello(): print('hello world') t1 = PythonOperator( task_id='hello', python_callable=hello, dag=dag)
gsutilを使って上記dagをcloud storageにupload。
# BUCKET_NAMEにはDAGsから開いたバケットの名前を入れる $ gsutil cp example_dag.py gs://${BUCKET_NAME}/dags/example_dag.py
しばらく待ってAirflowのUIを開くと、配置したhello_worldが出てくる。
Browse → Task Instancesを見ると実行されたタスクが出てくるので、完了しているタスクのログを見てみる。
[2019-09-28 06:40:11,089] {logging_mixin.py:95} INFO - hello world
hello worldと書かれたログが発見できる。
BashOperatorでファイル出力
続いてBashOperatorを使ってファイルを出力してみる。
先ほどのPythonをちょっと修正してBashOperatorの中でファイルに日時を追記し続ける処理を入れてみる。
from airflow.operators.bash_operator import BashOperator t1 = PythonOperator( task_id='hello', python_callable=hello, dag=dag) cmd = 'echo `date +%Y%m%d%H%M%S` >> /home/airflow/gcs/data/foo.txt' t2 = BashOperator( task_id='hello2', bash_command=cmd, dag=dag) t1 >> t2
/home/airflow/gcs/dataというディレクトリに出力しているが、このパスはFuseを使ってDAGを置いたのと同じバケットのdataフォルダにマウントされているらしい。
上記のファイルをcloud storageにupload。
$ gsutil cp example_dag.py gs://${BUCKET_NAME}/dags/example_dag.py
Graph Viewを確認すると今回追加したhello2がグラフに出てくる。
cloud storageを見るとちゃんとdataフォルダにファイルが追加されている。
改定履歴
Author: Masato Watanabe, Date: 2019-10-03, 記事投稿