iMind Developers Blog

iMind開発者ブログ

GCPのCloud ComposerでHello World

概要

Airflowの運用をGCP任せにできるCloud Composerを使ってPythonOperatorやBashOperatorを叩いてみる。

バージョン情報

  • Google Cloud SDK 264.0.0
  • composer-1.7.5-airflow-1.10.2

起動

GCPのWebUIからcomposerを選択し、environmentを作成する。

f:id:imind:20190928164218p:plain

各項目には下記を入力した。

項目 補足
Name example
Node count 3
Location asia-northeast1
Zone asia-northeast1-b どれでもいい
Machine type n1-standard-1 Workerのタイプ
Disk size(GB) 20
Image version composer-1.7.5-airflow-1.10.2
Python version 3

これで立ち上げるとComposerが用意される。(立ち上がるのにけっこう時間がかかった)

Airflow webserverからAirflowのWebUIが表示できる。またDAGsからdagファイルの置かれているCloud Storageへのリンクが表示される。

f:id:imind:20190928164256p:plain

Composerの料金

2019年10月時点で利用料金例ではアイオワリージョンで$76.06と想定されている。東京だと20%強上がるので月1万円近く見込む必要がある。けっこう高い。

https://cloud.google.com/composer/pricing?hl=ja

試した後はちゃんと止めておこう。

Hello World PythonOperator

まずはPythonOperatorで毎分hello worldとprintするだけのDAGを登録してみる。

Composerの画面からDAGsを選ぶとCloud Strageが開く。開いたパスにはairflow_monitoring.pyというDGAが記述されたファイル(中身はBashOperatorでechoしてるだけ)が置いてある。

バケットは自動生成された新規のものが使われている。

ここに適当なDAGを置けばAirflowが読み込んでくれる。

試しに下記のようなDAGをexample_dag.pyという名前で保存する。

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'hello_world',
    default_args=default_args,
    schedule_interval='0 * * * *')

def hello():
    print('hello world')

t1 = PythonOperator(
    task_id='hello',
    python_callable=hello,
    dag=dag)

gsutilを使って上記dagをcloud storageにupload。

# BUCKET_NAMEにはDAGsから開いたバケットの名前を入れる
$ gsutil cp example_dag.py gs://${BUCKET_NAME}/dags/example_dag.py

しばらく待ってAirflowのUIを開くと、配置したhello_worldが出てくる。

f:id:imind:20190928170243p:plain

Browse → Task Instancesを見ると実行されたタスクが出てくるので、完了しているタスクのログを見てみる。

[2019-09-28 06:40:11,089] {logging_mixin.py:95} INFO - hello world

hello worldと書かれたログが発見できる。

BashOperatorでファイル出力

続いてBashOperatorを使ってファイルを出力してみる。

先ほどのPythonをちょっと修正してBashOperatorの中でファイルに日時を追記し続ける処理を入れてみる。

from airflow.operators.bash_operator import BashOperator

t1 = PythonOperator(
    task_id='hello',
    python_callable=hello,
    dag=dag)

cmd = 'echo `date +%Y%m%d%H%M%S` >> /home/airflow/gcs/data/foo.txt'
t2 = BashOperator(
    task_id='hello2',
    bash_command=cmd,
    dag=dag)

t1 >> t2

/home/airflow/gcs/dataというディレクトリに出力しているが、このパスはFuseを使ってDAGを置いたのと同じバケットのdataフォルダにマウントされているらしい。

上記のファイルをcloud storageにupload。

$ gsutil cp example_dag.py gs://${BUCKET_NAME}/dags/example_dag.py

Graph Viewを確認すると今回追加したhello2がグラフに出てくる。

f:id:imind:20190928170415p:plain

cloud storageを見るとちゃんとdataフォルダにファイルが追加されている。

改定履歴

Author: Masato Watanabe, Date: 2019-10-03, 記事投稿