概要
AirflowのCheckOperatorを使ってテーブルのカウント等のチェックを行う。
バージョン情報
- Python 3.6.7
- apache-airflow==1.10.3
CheckOperatorの種類
name | description |
---|---|
CheckOperator | 0やemptyをチェックする |
ValueCheckOperator | 値が指定値と同じか、指定範囲に収まるかをチェックする |
IntervalCheckOperator | 指定期間のカウントと比べて大きく外れていないかチェックする |
本記事でやること
- PostgreSQLにtimestampと数値が入ったテーブルを用意
- 各種CheckOperatorを使って値をチェック
- エラーであればSlackにメッセージを飛ばす
サンプルデータの用意
timestampとvalueを持つテーブルを作り、4/1〜4/3の3日間、4件ずつデータを入れる。
create table example (dt timestamp, value bigint); insert into example values ('2019-04-01 00:00', 10) , ('2019-04-01 01:00', 15) , ('2019-04-01 02:00', 8) , ('2019-04-01 03:00', 7) , ('2019-04-02 00:00', 9) , ('2019-04-02 01:00', 12) , ('2019-04-02 02:00', 11) , ('2019-04-02 03:00', 9) , ('2019-04-03 00:00', 8) , ('2019-04-03 01:00', 7) , ('2019-04-03 02:00', 7) , ('2019-04-03 03:00', 15) ;
これで件数が「普段は件数4件なのに今日は2件しかいない」とか「普段のvalueは7〜15の間なのに急に1や100が来た」といった現象を検知できるようなコードを書く。
Airflowから利用する為に、Connectionsに接続情報を指定。Web画面から Admin → Connectionsを選択する。
mysql_defaultとかpostgresql_default等、扱うDBの種類に応じたデフォルト設定を選択し、接続情報を入力してSaveする。
今回はpostgresqlを利用するということで下記を設定。
CheckOperatorの利用
設定したConnectionを使ってCheckOperatorを記述してみる。
CheckOperatorはSQLとconn_idを引数に取る。下記のように記述する。
from airflow.operators.check_operator import CheckOperator sql = "select count(*) from example where dt >= '2019-04-04' and dt < '2019-04-05'" check_op = CheckOperator( sql=sql, conn_id='postgres_default', task_id='checker', dag=dag)
CheckOperatorがチェック失敗となる条件は下記。
False
0
- Empty string (
""
) - Empty list (
[]
) - Empty dictionary or set (
{}
)
検索してレコードが0だったりemptyだったりすると失敗扱いになる。
下記は0件だった場合のログ出力。Resultsが0でAirflowExceptionが発生している。
airflow.exceptions.AirflowException: Test failed. Query: select count(*) from example where dt >= '2019-04-04' and dt < '2019-04-05' Results: (0,)
件数が0件以上であれば成功する。sqlの条件をレコードが存在する日に切り替えて再実行。
sql = "select count(*) from example where dt >= '2019-04-03' and dt < '2019-04-04'"
4件取れるのでSuccess。
[2019-05-07 11:17:35,559] {check_operator.py:82} INFO - Record: (4,) [2019-05-07 11:17:35,560] {check_operator.py:88} INFO - Success.
CheckOperatorのsql引数にはテンプレートが効くので、クエリのところは {{ ds }} や {{ ds_add(ds, -1) }} 等で置き換えられる。
sql = '''select count(*) from example where dt >= '{{ macros.ds_add( ds, -1 ) }}' and dt < '{{ ds }}' '''
チェックに引っかかったらSlackにエラーを投げる
下記ページで設定したSlackに投げるOperatorを利用する。Variableの設定等はリンク先を参照。
参考 : Airflowでタスク失敗時にSlackへメッセージを送る
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator slack_op = SlackWebhookOperator( task_id='slack_op', http_conn_id='slack_webhook', webhook_token=Variable.get("slack_webhook_access_token"), message='count failed: {{ ts }}', channel='#test-room', username='test-user', trigger_rule=TriggerRule.ONE_FAILED, dag=dag)
triggerをONE_FAILED(何かしらミスったら動かす)にしているので、下記のようにcheckのdownstreamにslackを噛ませれば、失敗したら投げる挙動になる。
check_op >> slack_op
また on_failure_callbackに指定しても良い。
def on_failure(context): from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator slack_op = SlackWebhookOperator( task_id='slack_op', http_conn_id='slack_webhook', webhook_token=Variable.get("slack_webhook_access_token"), message='count failed: {{ ts }}', channel='#test-room', username='test-user', trigger_rule=TriggerRule.ONE_FAILED, dag=dag) return slack_op.execute(context) sql = 'select count(*) from table_name' check_op = CheckOperator( sql=sql, conn_id='postgres_default', task_id='checker', dag=dag, on_failure_callback=on_failure)
ValueCheckOperatorの利用
ValueCheckOperatorは期待した戻り値でなければfailedになる。
下記は戻り値に4を期待した場合。
from airflow.operators.check_operator import ValueCheckOperator check_op = ValueCheckOperator( sql=sql, pass_value=4, conn_id='postgres_default', task_id='checker', dag=dag)
失敗すると下記のようなログが出力されfailedになる。Pass valueに4.0が指定されたけどResultsが0だったというログ。
airflow.exceptions.AirflowException: Test failed. Pass value:4.0 Tolerance:None Query: select count(*) from example where dt >= '2019-04-05' and dt < '2019-04-06' Results: (0,)
引数にtoleranceが指定してある場合は、下記のような式に合致する場合はパスできる。pass_valueで渡した値はp、toleranceで渡した値はtolとする。
p * (1 - tol) <= sql_result < p * (1 + tol)
下記はpass_valueを6、toleranceを0.2(20%のブレは許容)に指定している。つまり4.8〜7.2の間であればパスできる。
check_op = ValueCheckOperator( sql=sql, pass_value=6, tolerance=0.2 conn_id='postgres_default', task_id='checker', dag=dag)
結果が4を返すSQLを実行すると範囲外になるのでfailedになる。
airflow.exceptions.AirflowException: Test failed. Pass value:6.0 Tolerance:20.0% Query: select count(*) from example where dt >= '2019-04-01' and dt < '2019-04-02' Results: (4,)
IntervalCheckOperatorの利用
IntervalCheckOperatorは過去データと値を比較する。
デフォルトだと7日前の値と比較する。SQL的には下記のような2つを発行し比較することになる。
SELECT {sqlexp} FROM {table} WHERE {date_filter_column}={ds} SELECT {sqlexp} FROM {table} WHERE {date_filter_column}={{ macros.ds_add(ds, -7) }}
指定できる引数は下記。
引数 | 必須 | 意味 |
---|---|---|
table | ○ | テーブル名 |
metrics_thresholds | ○ | dict型。sqlexpに入るSQL表現がkey、閾値(int)がValue |
date_filter_column | 日付カラム(default=ds) | |
days_back | n日以前と比較する(default=-7) | |
ratio_formula | 閾値チェック。max_over_minかrelative_diff (default=max_over_min) | |
ignore_zero | SQLの結果が0の場合のテスト結果(default=True=Success) |
引数ratio_formulaに指定されるmax_over_minとrelative_diffはそれぞれこんな意味。curが現在値でrefがn日前の数字。
'max_over_min': lambda cur, ref: float(max(cur, ref)) / min(cur, ref) 'relative_diff': lambda cur, ref: float(abs(cur - ref)) / ref
概要がわかったので実演してみる。exampleテーブルのレコード数、valueのmean、valueのmaxの3つを前日比(days_back=-1を指定)でチェックしてみる。
今回は日付型が入るところがtimestampになっているので、date_filter_columnはdateにcastする記述を入れる。
thresholdは1.2(max / min < 1.2)を指定。
metrics_thresholds = { "count(*)": 1.2, "cast(avg(value) as float)": 1.2, "max(value)": 1.2 } from airflow.operators.check_operator import IntervalCheckOperator check_op = IntervalCheckOperator( table='example', metrics_thresholds=metrics_thresholds, days_back=-1, date_filter_column='cast(dt as date)', conn_id='postgres_default', task_id='checker', dag=dag)
avgをcastしているが、これはavgの戻り値がDecimal型になって下記のようなエラーが出た為。
ERROR - unsupported operand type(s) for /: 'float' and 'decimal.Decimal'
これを実行するとcountとavgはパスするがmaxが1.25の為、failedになる。
INFO - Ratio for cast(avg(value) as float): 1.1081081081081081 INFO - Ratio for count(*): 1.0 INFO - Ratio for max(value): 1.25 WARNING - The following 1 tests out of 3 failed: WARNING - 'max(value)' check failed. 1.25 is above 1.2 ERROR - The following tests have failed: max(value)
自作Operatorでチェックする
IntervalCheckOperatorだと障害が起きて異常値が発生した日があった場合に、その日だけでなく翌週にチェックを行った場合にも引っかかることになる。
もう少し自由な比較がしたいので、2つのSQLを引数に指定して実行結果を比較する自作のOperatorを書いてみる。
下記がOperatorの記述。行数削減の為にIntervalCheckOperatorを継承しているけど、ちゃんと作るならBaseOperatorを継承して自前で全部書いた方が良いと思う。
from airflow.utils.decorators import apply_defaults from airflow.operators.check_operator import IntervalCheckOperator class SqlDiffCheckOperator(IntervalCheckOperator): @apply_defaults def __init__( self, sql1, sql2, metrics_thresholds, # type: Dict[str, float] date_filter_column='ds', # type: Optional[str] ratio_formula='max_over_min', # type: Optional[str] ignore_zero=True, # type: Optional[bool] conn_id=None, # type: Optional[str] *args, **kwargs ): super().__init__( table='dummy', metrics_thresholds=metrics_thresholds, ratio_formula=ratio_formula, ignore_zero=ignore_zero, conn_id=conn_id, *args, **kwargs) self.sql1 = sql1 self.sql2 = sql2
dag側の記述。maxやavgはそのまま比較できるけど、countは7日分取ると7倍になってしまうので7で割っている。
metrics_thresholds = { "cnt": 1.2, "avg": 1.2, "max": 1.2 } sql1 = """select count(*) cnt, cast(avg(value) as float) avg, max(value) max from example where dt=""" + "'{{ ds }}'" sql2 = """select cast(count(*) / 7.0 as float) cnt, cast(avg(value) as float) avg, max(value) max from example where dt between """ + \ "'{{ macros.ds_add(ds, -8) }}' AND '{{ macros.ds_add(ds, -1) }}'" from airflow.operators.check_operator import IntervalCheckOperator check_op = SqlDiffCheckOperator( sql1=sql1, sql2=sql2, metrics_thresholds=metrics_thresholds, conn_id='postgres_default', task_id='checker', dag=dag)
実行結果。where句のtimestampのcastを書き忘れたので意図した結果になってないけどまあいいや。
WARNING - The following 3 tests out of 3 failed: WARNING - 'avg' check failed. 1.4000000000000006 is above 1.2 WARNING - 'cnt' check failed. 1.225 is above 1.2 WARNING - 'max' check failed. 1.875 is above 1.2 ERROR - The following tests have failed:
この手の汎用的に使えそうなOperatorを用意しておいて、継承してcount用やaverage用のOperatorをそれぞれ作ったり、SQL生成のところはマクロにしたりすると便利そう。
改定履歴
Author: Masato Watanabe, Date: 2019-05-09, 記事投稿