概要
Airflowのタスクが失敗した際にSlackにメッセージを送るようにする。
トークン等はVariablesに保存して扱う。
バージョン情報
- Python 3.6.7
- apache-airflow==1.10.1
- slackclient==1.3.0
導入
slackclientが必要になるので入れておく。
$ pip install slackclient
Variablesの設定
下記あたりを参考にSlackのトークンを取得しておく。
https://api.slack.com/incoming-webhooks
AirflowのWebサーバーを立ち上げる
$ airflow webserver -p8080
Admin → Variablesを選択。
CreateタブからVariableを生成する。
公式ドキュメントによるとVariableのキー名がパスワードっぽいものについては値がマスクされるらしい。
具体的には‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’のいずれかが文字列に含まれていれば対象になる。
The variable view allows you to list, create, edit or delete the key-value pair of a variable used during jobs. Value of a variable will be hidden if the key contains any words in (‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’) by default, but can be configured to show in clear-text.
Slackのトークンについてはもちろんマスクされたい情報なので slack_access_token という名前で登録する。
Valがマスクされた状態で登録されている。
SlackAPIPostOperatorでメッセージ送信
SlackAPIPostOperatorを利用してPythonOperatorのタスク失敗時にexceptionのメッセージをSlackに通知してみる。
from airflow import DAG from airflow.models import Variable from airflow.operators.slack_operator import SlackAPIPostOperator from airflow.operators.python_operator import PythonOperator import datetime def send_slack(context): ''' 処理失敗時のcallback ''' op = SlackAPIPostOperator( task_id='task_failure', token=Variable.get("slack_access_token"), text=str(context['exception']), channel='#room-name', username='user-name') return op.execute(context=context) # 必ず例外が発生する関数 def hello(): raise Exception('テスト') args = { 'owner': 'masato watanabe', 'start_date': datetime.datetime(2018, 12, 15) } dag = DAG('slack_example', default_args=args, schedule_interval='0 0 * * *') # on_failure_callbackに上で定義したsend_slackを入れる PythonOperator( task_id='hello', python_callable=hello, on_failure_callback=send_slack, dag=dag )
これで準備OK。
airflow runで上で定義したhelloタスクを叩くと、Slackに例外で設定したメッセージ(ここではテスト)が送信される。
$ airflow run slack_example hello 2018-12-15T00:00
context['exception'] の中身は例外インスタンスなので、 isinstance(context['exception'], SomeException) などで判定して例外のタイプに応じて動作を変えることもできる。
BashOperatorの場合
BashOperatorの場合の挙動も見てみる。slack_sendのコードはそのままに、PythonOperatorを利用していたhelloタスクをBashOperatorに切り替える。DAGの名前もslack_example_bashと変えておく。
from airflow.operators.bash_operator import BashOperator dag = DAG('slack_example_bash', default_args=args, schedule_interval='0 0 * * *') # exit codeで1を返すbashを叩くOperator BashOperator( task_id='hello', bash_command='exit 1', on_failure_callback=send_slack, dag=dag )
実行
$ airflow run slack_example_bash hello 2018-12-15T00:00
実行すると「Bash command failed」というエラメッセージがSlackに飛んだ。中身を見てみるとAirflowExceptionという例外が入っていた。
AirflowException('Bash command failed',)
BashOperatorでエラーメッセージが欲しい
XCom(feature for operator cross-communication)を使ってBashOperatorからメッセージを受け取る。
BashOperatorではXComは下記のような動きになると書かれている。
If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes.
https://airflow.incubator.apache.org/code.html?highlight=bashoperator#airflow.operators.BashOperator
訳すと「xcom_pushをTrueに設定すると、stdoutに最後に書かれた内容がbash終了時にXComにpushされるよ」ということらしい。
下記のように記述すると最後のechoに出力した値を取得できる。
hello = BashOperator( task_id='hello', xcom_push=True, bash_command='exit 1 | echo hogehoge', on_failure_callback=send_slack, dag=dag ) def send_slack(context): ''' 処理失敗時のcallback ''' bash_message = context['task_instance'].xcom_pull(task_ids='hello') # (以下略)
TriggerRuleでエラーを送るケースを設定する
すべてのタスクにon_failure_callbackを設定するとタスクごとにエラーが投げられることになる。TriggerRuleを利用して最後に1回だけエラーが投げられるように設定してみる。
TriggerRuleには下記のような指定が用意されている。
- ALL_SUCCESS
- ALL_FAILED
- ALL_DONE
- ONE_SUCCESS
- ONE_FAILED
- DUMMY
- NONE_FAILED
下記はop1(失敗する)、op2(成功する)、slackOp(slackに投げる)という3つのOperatorを用意している。
args = { 'owner': 'masato watanabe', 'start_date': datetime.datetime(2018, 12, 15) } dag = DAG('slack_example_bash', default_args=args, schedule_interval='*/10 * * * *') # 失敗するBashOperator op1 = BashOperator( task_id='op1', xcom_push=True, bash_command='exit 1', dag=dag ) # 成功するBashOperator op2 = BashOperator( task_id='op2', xcom_push=True, bash_command='exit 0', dag=dag ) # ALL_FAILEDで実行されるSlackAPIPostOperator slackOp = SlackAPIPostOperator( task_id='slack_op', token=Variable.get("slack_access_token"), text='all failed: {{ ts }}', channel='#room-name', username='user-name', trigger_rule=TriggerRule.ALL_FAILED, dag=dag) op1 >> slackOp op2 >> slackOp
trigger_ruleはParentのタスクすべてにかかるので、SlackAPIはop1, op2の双方をParentに持つようにしておく。
これで双方がエラーになればALL_FAILEDが満たされてSlackにメッセージが飛ぶが、今回はop2は成功するのでメッセージは飛ばない。triggerをONE_FAILEDに変更するとエラーが飛ぶ。
textに指定している {{ ts }} はcontextに入っているtimestamp。エラーが起きた場合は下記のようなメッセージがSlackに送信される。
all failed: 2018-12-17T14:00:00+00:00
SlackWebhookOperatorを使う
ここまでは SlackAPIPostOperator を使ってきたが SlackWebhookOperator も使ってみる。
WebGUIの Admin → Connections を開いてslackのwebhook用のConnectionを生成しておく。
HostのところにURLを書いているのが奇妙な感じがするけど、airflowのソースコードを見ると下記のように :// が入っていたらbase_urlとして扱うとしているのでこれで良いらしい。
if "://" in conn.host: self.base_url = conn.host
webhook urlに使うToken(webhook urlのservice以降の部分)についてはVariablesの方に登録しておく。
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator slackOp = SlackWebhookOperator( task_id='slack_op', http_conn_id='slack_webhook', webhook_token=Variable.get("slack_webhook_access_token"), message='one failed: {{ ts }}', channel='#room-name', username='user-name', trigger_rule=TriggerRule.ONE_FAILED, dag=dag)
slackOpを上記のように変更すると、下記のようなメッセージがSlackに送信される。テンプレートを展開はしてくれないようだ。
one failed: {{ ts }}
テンプレートの展開はクラスの中でtemplate_fields変数を定義するだけなので、下記のような自作のクラスを用意すれば展開できる。
class MySlackWebhookOperator(SlackWebhookOperator): template_fields = ('username', 'message', 'channel')
改定履歴
Author: Masato Watanabe, Date: 2019-02-08, 記事投稿