iMind Developers Blog

iMind開発者ブログ

Airflowでタスク失敗時にSlackへメッセージを送る

概要

Airflowのタスクが失敗した際にSlackにメッセージを送るようにする。

トークン等はVariablesに保存して扱う。

バージョン情報

  • Python 3.6.7
  • apache-airflow==1.10.1
  • slackclient==1.3.0

導入

slackclientが必要になるので入れておく。

$ pip install slackclient

Variablesの設定

下記あたりを参考にSlackのトークンを取得しておく。

https://api.slack.com/incoming-webhooks

AirflowのWebサーバーを立ち上げる

$ airflow webserver -p8080

Admin → Variablesを選択。

f:id:imind:20181215180833p:plain

CreateタブからVariableを生成する。

公式ドキュメントによるとVariableのキー名がパスワードっぽいものについては値がマスクされるらしい。

具体的には‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’のいずれかが文字列に含まれていれば対象になる。

The variable view allows you to list, create, edit or delete the key-value pair of a variable used during jobs. Value of a variable will be hidden if the key contains any words in (‘password’, ‘secret’, ‘passwd’, ‘authorization’, ‘api_key’, ‘apikey’, ‘access_token’) by default, but can be configured to show in clear-text.

https://airflow.readthedocs.io/en/stable/ui.html

Slackのトークンについてはもちろんマスクされたい情報なので slack_access_token という名前で登録する。

f:id:imind:20181215195535p:plain

Valがマスクされた状態で登録されている。

SlackAPIPostOperatorでメッセージ送信

SlackAPIPostOperatorを利用してPythonOperatorのタスク失敗時にexceptionのメッセージをSlackに通知してみる。

from airflow import DAG
from airflow.models import Variable
from airflow.operators.slack_operator import SlackAPIPostOperator
from airflow.operators.python_operator import PythonOperator
import datetime

def send_slack(context):
    ''' 処理失敗時のcallback
    '''
    op = SlackAPIPostOperator(
          task_id='task_failure',
          token=Variable.get("slack_access_token"),
          text=str(context['exception']),
          channel='#room-name',
          username='user-name')
    return op.execute(context=context)

# 必ず例外が発生する関数
def hello():
    raise Exception('テスト')

args = {
    'owner': 'masato watanabe',
    'start_date': datetime.datetime(2018, 12, 15)
}

dag = DAG('slack_example', default_args=args, schedule_interval='0 0 * * *')

# on_failure_callbackに上で定義したsend_slackを入れる
PythonOperator(
    task_id='hello',
    python_callable=hello,
    on_failure_callback=send_slack,
    dag=dag
)

これで準備OK。

airflow runで上で定義したhelloタスクを叩くと、Slackに例外で設定したメッセージ(ここではテスト)が送信される。

$ airflow run slack_example hello 2018-12-15T00:00

context['exception'] の中身は例外インスタンスなので、 isinstance(context['exception'], SomeException) などで判定して例外のタイプに応じて動作を変えることもできる。

BashOperatorの場合

BashOperatorの場合の挙動も見てみる。slack_sendのコードはそのままに、PythonOperatorを利用していたhelloタスクをBashOperatorに切り替える。DAGの名前もslack_example_bashと変えておく。

from airflow.operators.bash_operator import BashOperator

dag = DAG('slack_example_bash', default_args=args, schedule_interval='0 0 * * *')

# exit codeで1を返すbashを叩くOperator
BashOperator(
    task_id='hello',
    bash_command='exit 1',
    on_failure_callback=send_slack,
    dag=dag
)

実行

$ airflow run slack_example_bash hello 2018-12-15T00:00

実行すると「Bash command failed」というエラメッセージがSlackに飛んだ。中身を見てみるとAirflowExceptionという例外が入っていた。

AirflowException('Bash command failed',)

BashOperatorでエラーメッセージが欲しい

XCom(feature for operator cross-communication)を使ってBashOperatorからメッセージを受け取る。

BashOperatorではXComは下記のような動きになると書かれている。

If xcom_push is True, the last line written to stdout will also be pushed to an XCom when the bash command completes.

https://airflow.incubator.apache.org/code.html?highlight=bashoperator#airflow.operators.BashOperator

訳すと「xcom_pushをTrueに設定すると、stdoutに最後に書かれた内容がbash終了時にXComにpushされるよ」ということらしい。

下記のように記述すると最後のechoに出力した値を取得できる。

hello = BashOperator(
    task_id='hello',
    xcom_push=True,
    bash_command='exit 1 | echo hogehoge',
    on_failure_callback=send_slack,
    dag=dag
)

def send_slack(context):
    ''' 処理失敗時のcallback
    '''
    bash_message = context['task_instance'].xcom_pull(task_ids='hello')
    # (以下略)

TriggerRuleでエラーを送るケースを設定する

すべてのタスクにon_failure_callbackを設定するとタスクごとにエラーが投げられることになる。TriggerRuleを利用して最後に1回だけエラーが投げられるように設定してみる。

TriggerRuleには下記のような指定が用意されている。

  • ALL_SUCCESS
  • ALL_FAILED
  • ALL_DONE
  • ONE_SUCCESS
  • ONE_FAILED
  • DUMMY
  • NONE_FAILED

下記はop1(失敗する)、op2(成功する)、slackOp(slackに投げる)という3つのOperatorを用意している。

args = {
    'owner': 'masato watanabe',
    'start_date': datetime.datetime(2018, 12, 15)
}

dag = DAG('slack_example_bash', default_args=args, schedule_interval='*/10 * * * *')

# 失敗するBashOperator
op1 = BashOperator(
    task_id='op1',
    xcom_push=True,
    bash_command='exit 1',
    dag=dag
)

# 成功するBashOperator
op2 = BashOperator(
    task_id='op2',
    xcom_push=True,
    bash_command='exit 0',
    dag=dag
)

# ALL_FAILEDで実行されるSlackAPIPostOperator
slackOp = SlackAPIPostOperator(
          task_id='slack_op',
          token=Variable.get("slack_access_token"),
          text='all failed: {{ ts }}',
          channel='#room-name',
          username='user-name',
          trigger_rule=TriggerRule.ALL_FAILED,
          dag=dag)

op1 >> slackOp
op2 >> slackOp

f:id:imind:20181217225937p:plain

trigger_ruleはParentのタスクすべてにかかるので、SlackAPIはop1, op2の双方をParentに持つようにしておく。

これで双方がエラーになればALL_FAILEDが満たされてSlackにメッセージが飛ぶが、今回はop2は成功するのでメッセージは飛ばない。triggerをONE_FAILEDに変更するとエラーが飛ぶ。

textに指定している {{ ts }} はcontextに入っているtimestamp。エラーが起きた場合は下記のようなメッセージがSlackに送信される。

all failed: 2018-12-17T14:00:00+00:00

SlackWebhookOperatorを使う

ここまでは SlackAPIPostOperator を使ってきたが SlackWebhookOperator も使ってみる。

WebGUIの Admin → Connections を開いてslackのwebhook用のConnectionを生成しておく。

f:id:imind:20181217233040p:plain

HostのところにURLを書いているのが奇妙な感じがするけど、airflowのソースコードを見ると下記のように :// が入っていたらbase_urlとして扱うとしているのでこれで良いらしい。

if "://" in conn.host:
    self.base_url = conn.host

webhook urlに使うToken(webhook urlのservice以降の部分)についてはVariablesの方に登録しておく。

f:id:imind:20181217234523p:plain

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slackOp = SlackWebhookOperator(
          task_id='slack_op',
          http_conn_id='slack_webhook',
          webhook_token=Variable.get("slack_webhook_access_token"),
          message='one failed: {{ ts }}',
          channel='#room-name',
          username='user-name',
          trigger_rule=TriggerRule.ONE_FAILED,
          dag=dag)

slackOpを上記のように変更すると、下記のようなメッセージがSlackに送信される。テンプレートを展開はしてくれないようだ。

one failed: {{ ts }}

テンプレートの展開はクラスの中でtemplate_fields変数を定義するだけなので、下記のような自作のクラスを用意すれば展開できる。

class MySlackWebhookOperator(SlackWebhookOperator):
    template_fields = ('username', 'message', 'channel')

改定履歴

Author: Masato Watanabe, Date: 2019-02-08, 記事投稿