iMind Developers Blog

iMind開発者ブログ

AirflowのCheckOperatorでDBの値チェック

概要

AirflowのCheckOperatorを使ってテーブルのカウント等のチェックを行う。

バージョン情報

  • Python 3.6.7
  • apache-airflow==1.10.3

CheckOperatorの種類

name description
CheckOperator 0やemptyをチェックする
ValueCheckOperator 値が指定値と同じか、指定範囲に収まるかをチェックする
IntervalCheckOperator 指定期間のカウントと比べて大きく外れていないかチェックする

本記事でやること

  1. PostgreSQLにtimestampと数値が入ったテーブルを用意
  2. 各種CheckOperatorを使って値をチェック
  3. エラーであればSlackにメッセージを飛ばす

サンプルデータの用意

timestampとvalueを持つテーブルを作り、4/1〜4/3の3日間、4件ずつデータを入れる。

create table example (dt timestamp, value bigint);

insert into example values
('2019-04-01 00:00', 10) ,
('2019-04-01 01:00', 15) ,
('2019-04-01 02:00', 8) ,
('2019-04-01 03:00', 7) ,
('2019-04-02 00:00', 9) ,
('2019-04-02 01:00', 12) ,
('2019-04-02 02:00', 11) ,
('2019-04-02 03:00', 9) ,
('2019-04-03 00:00', 8) ,
('2019-04-03 01:00', 7) ,
('2019-04-03 02:00', 7) ,
('2019-04-03 03:00', 15) ;

これで件数が「普段は件数4件なのに今日は2件しかいない」とか「普段のvalueは7〜15の間なのに急に1や100が来た」といった現象を検知できるようなコードを書く。

Airflowから利用する為に、Connectionsに接続情報を指定。Web画面から Admin → Connectionsを選択する。

f:id:imind:20181221004713p:plain

mysql_defaultとかpostgresql_default等、扱うDBの種類に応じたデフォルト設定を選択し、接続情報を入力してSaveする。

今回はpostgresqlを利用するということで下記を設定。

f:id:imind:20181221005058p:plain

CheckOperatorの利用

設定したConnectionを使ってCheckOperatorを記述してみる。

CheckOperatorはSQLとconn_idを引数に取る。下記のように記述する。

from airflow.operators.check_operator import CheckOperator
sql = "select count(*) from example where dt >= '2019-04-04' and dt < '2019-04-05'"
check_op = CheckOperator(
    sql=sql,
    conn_id='postgres_default',
    task_id='checker',
    dag=dag)

CheckOperatorがチェック失敗となる条件は下記。

  • False
  • 0
  • Empty string ("")
  • Empty list ([])
  • Empty dictionary or set ({})

検索してレコードが0だったりemptyだったりすると失敗扱いになる。

下記は0件だった場合のログ出力。Resultsが0でAirflowExceptionが発生している。

airflow.exceptions.AirflowException: Test failed.
Query:
select count(*) from example where dt >= '2019-04-04' and dt < '2019-04-05'
Results:
(0,)

件数が0件以上であれば成功する。sqlの条件をレコードが存在する日に切り替えて再実行。

sql = "select count(*) from example where dt >= '2019-04-03' and dt < '2019-04-04'"

4件取れるのでSuccess。

[2019-05-07 11:17:35,559] {check_operator.py:82} INFO - Record: (4,)
[2019-05-07 11:17:35,560] {check_operator.py:88} INFO - Success.

CheckOperatorのsql引数にはテンプレートが効くので、クエリのところは {{ ds }} や {{ ds_add(ds, -1) }} 等で置き換えられる。

sql = '''select count(*) from example
where dt >= '{{ macros.ds_add( ds, -1 ) }}' and dt < '{{ ds }}' '''

参考 : Airflowの日付マクロまとめ

チェックに引っかかったらSlackにエラーを投げる

下記ページで設定したSlackに投げるOperatorを利用する。Variableの設定等はリンク先を参照。

参考 : Airflowでタスク失敗時にSlackへメッセージを送る

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_op = SlackWebhookOperator(
          task_id='slack_op',
          http_conn_id='slack_webhook',
          webhook_token=Variable.get("slack_webhook_access_token"),
          message='count failed: {{ ts }}',
          channel='#test-room',
          username='test-user',
          trigger_rule=TriggerRule.ONE_FAILED,
          dag=dag)

triggerをONE_FAILED(何かしらミスったら動かす)にしているので、下記のようにcheckのdownstreamにslackを噛ませれば、失敗したら投げる挙動になる。

check_op  >> slack_op

また on_failure_callbackに指定しても良い。

def on_failure(context):
    from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
    slack_op = SlackWebhookOperator(
              task_id='slack_op',
              http_conn_id='slack_webhook',
              webhook_token=Variable.get("slack_webhook_access_token"),
              message='count failed: {{ ts }}',
              channel='#test-room',
              username='test-user',
              trigger_rule=TriggerRule.ONE_FAILED,
              dag=dag)
    return slack_op.execute(context)

sql = 'select count(*) from table_name'
check_op = CheckOperator(
    sql=sql,
    conn_id='postgres_default',
    task_id='checker',
    dag=dag,
    on_failure_callback=on_failure)

ValueCheckOperatorの利用

ValueCheckOperatorは期待した戻り値でなければfailedになる。

下記は戻り値に4を期待した場合。

from airflow.operators.check_operator import ValueCheckOperator
check_op = ValueCheckOperator(
    sql=sql,
    pass_value=4,
    conn_id='postgres_default',
    task_id='checker',
    dag=dag)

失敗すると下記のようなログが出力されfailedになる。Pass valueに4.0が指定されたけどResultsが0だったというログ。

airflow.exceptions.AirflowException: Test failed.
Pass value:4.0
Tolerance:None
Query: 
select count(*) from example where dt >= '2019-04-05' and dt < '2019-04-06'
Results:
(0,)

引数にtoleranceが指定してある場合は、下記のような式に合致する場合はパスできる。pass_valueで渡した値はp、toleranceで渡した値はtolとする。

p * (1 - tol) <= sql_result < p * (1 + tol)

下記はpass_valueを6、toleranceを0.2(20%のブレは許容)に指定している。つまり4.8〜7.2の間であればパスできる。

check_op = ValueCheckOperator(
    sql=sql,
    pass_value=6,
    tolerance=0.2
    conn_id='postgres_default',
    task_id='checker',
    dag=dag)

結果が4を返すSQLを実行すると範囲外になるのでfailedになる。

airflow.exceptions.AirflowException: Test failed.
Pass value:6.0
Tolerance:20.0%
Query:
select count(*) from example where dt >= '2019-04-01' and dt < '2019-04-02'
Results:
(4,)

IntervalCheckOperatorの利用

IntervalCheckOperatorは過去データと値を比較する。

デフォルトだと7日前の値と比較する。SQL的には下記のような2つを発行し比較することになる。

SELECT {sqlexp} FROM {table}
    WHERE {date_filter_column}={ds}

SELECT {sqlexp} FROM {table}
    WHERE {date_filter_column}={{ macros.ds_add(ds, -7) }}

指定できる引数は下記。

引数 必須 意味
table テーブル名
metrics_thresholds dict型。sqlexpに入るSQL表現がkey、閾値(int)がValue
date_filter_column 日付カラム(default=ds)
days_back n日以前と比較する(default=-7)
ratio_formula 閾値チェック。max_over_minかrelative_diff (default=max_over_min)
ignore_zero SQLの結果が0の場合のテスト結果(default=True=Success)

引数ratio_formulaに指定されるmax_over_minとrelative_diffはそれぞれこんな意味。curが現在値でrefがn日前の数字。

'max_over_min': lambda cur, ref: float(max(cur, ref)) / min(cur, ref)

'relative_diff': lambda cur, ref: float(abs(cur - ref)) / ref

概要がわかったので実演してみる。exampleテーブルのレコード数、valueのmean、valueのmaxの3つを前日比(days_back=-1を指定)でチェックしてみる。

今回は日付型が入るところがtimestampになっているので、date_filter_columnはdateにcastする記述を入れる。

thresholdは1.2(max / min < 1.2)を指定。

metrics_thresholds = {
    "count(*)": 1.2,
    "cast(avg(value) as float)": 1.2,
    "max(value)": 1.2
}

from airflow.operators.check_operator import IntervalCheckOperator
check_op = IntervalCheckOperator(
    table='example',
    metrics_thresholds=metrics_thresholds,
    days_back=-1,
    date_filter_column='cast(dt as date)',
    conn_id='postgres_default',
    task_id='checker',
    dag=dag)

avgをcastしているが、これはavgの戻り値がDecimal型になって下記のようなエラーが出た為。

ERROR - unsupported operand type(s) for /: 'float' and 'decimal.Decimal'

これを実行するとcountとavgはパスするがmaxが1.25の為、failedになる。

INFO - Ratio for cast(avg(value) as float): 1.1081081081081081
INFO - Ratio for count(*): 1.0
INFO - Ratio for max(value): 1.25

WARNING - The following 1 tests out of 3 failed:
WARNING - 'max(value)' check failed. 1.25 is above 1.2
ERROR - The following tests have failed:
 max(value)

自作Operatorでチェックする

IntervalCheckOperatorだと障害が起きて異常値が発生した日があった場合に、その日だけでなく翌週にチェックを行った場合にも引っかかることになる。

もう少し自由な比較がしたいので、2つのSQLを引数に指定して実行結果を比較する自作のOperatorを書いてみる。

下記がOperatorの記述。行数削減の為にIntervalCheckOperatorを継承しているけど、ちゃんと作るならBaseOperatorを継承して自前で全部書いた方が良いと思う。

from airflow.utils.decorators import apply_defaults
from airflow.operators.check_operator import IntervalCheckOperator

class SqlDiffCheckOperator(IntervalCheckOperator):

    @apply_defaults
    def __init__(
        self,
        sql1,
        sql2,
        metrics_thresholds,  # type: Dict[str, float]
        date_filter_column='ds',  # type: Optional[str]
        ratio_formula='max_over_min',  # type: Optional[str]
        ignore_zero=True,  # type: Optional[bool]
        conn_id=None,  # type: Optional[str]
        *args, **kwargs
    ):
        super().__init__(
            table='dummy', metrics_thresholds=metrics_thresholds,
            ratio_formula=ratio_formula, ignore_zero=ignore_zero,
            conn_id=conn_id, *args, **kwargs)
        self.sql1 = sql1
        self.sql2 = sql2

dag側の記述。maxやavgはそのまま比較できるけど、countは7日分取ると7倍になってしまうので7で割っている。

metrics_thresholds = {
    "cnt": 1.2,
    "avg": 1.2,
    "max": 1.2
}

sql1 = """select
count(*) cnt, cast(avg(value) as float) avg, max(value) max
from example where dt=""" + "'{{ ds }}'"

sql2 = """select
cast(count(*) / 7.0 as float) cnt, cast(avg(value) as float) avg, max(value) max
from example where dt between """ + \
"'{{ macros.ds_add(ds, -8) }}' AND '{{ macros.ds_add(ds, -1) }}'"

from airflow.operators.check_operator import IntervalCheckOperator
check_op = SqlDiffCheckOperator(
    sql1=sql1,
    sql2=sql2,
    metrics_thresholds=metrics_thresholds,
    conn_id='postgres_default',
    task_id='checker',
    dag=dag)

実行結果。where句のtimestampのcastを書き忘れたので意図した結果になってないけどまあいいや。

WARNING - The following 3 tests out of 3 failed:
WARNING - 'avg' check failed. 1.4000000000000006 is above 1.2
WARNING - 'cnt' check failed. 1.225 is above 1.2
WARNING - 'max' check failed. 1.875 is above 1.2
ERROR - The following tests have failed:

この手の汎用的に使えそうなOperatorを用意しておいて、継承してcount用やaverage用のOperatorをそれぞれ作ったり、SQL生成のところはマクロにしたりすると便利そう。

改定履歴

Author: Masato Watanabe, Date: 2019-05-09, 記事投稿