iMind Developers Blog

iMind開発者ブログ

AirflowのSSHOperatorを使う

概要

AirflowのSSHOperatorで指定のサーバーにsshしてコマンドを実行する。

バージョン情報

  • apache-airflow==1.10.2
  • Python 3.6.8

SSHOperatorの引数

SSHOperator実行時はこのへんのパラメータを指定する。

parameter description
ssh_conn_id ConnectionのID(必須)
ssh_hook 指定がなければairflow.contrib.hooks.ssh_hook.SSHHookで生成される
remote_host 接続先ホスト
command 実行したいコマンド

基本的にはssh_conn_idのところに必要な設定を書いておけば良い。

.ssh/confの扱い

ssh_hookは、.ssh/confを自動で読み込んでくれるようになっている。

https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/hooks/ssh_hook.py#L131

が、host_nameやportは読み込まない等、全ての設定を読んでくれるわけではないようなので、.ssh/confによる指定には頼らず、地道にConnectionのところに必要な情報を書くのが良さそう。(version 1.10時点)

Connectionの用意

仮に「noren」という名前のhostがあったとする。

Web画面のAdmin → Connectionsを開いて、Createから下記のようにsshの接続設定を記述する。(IPアドレスやポート等は実際に接続可能な値を指定してください)

f:id:mwsoft:20190208123004p:plain

Extraには利用する秘密鍵を絶対パスでJSON形式で指定している。

リモートホストでtouchを実行するSSHOperator

リモートホストでファイルをtouchするだけの処理を書いてみる。

import datetime
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator

args = {
    'owner': 'masato watanabe',
    'start_date': datetime.datetime(2019, 1, 1)
}

# DAGの設定。実行頻度は1日1回にしてみる。
dag = DAG('ssh_dag', default_args=args, schedule_interval='0 0 * * *')

# SSHOperatorの指定(ssh_conn_idに先程作ったconnectionを指定)
ssh_touch = SSHOperator(
    task_id='ssh_touch',
    ssh_conn_id='ssh_noren',
    command='touch foo',
    dag=dag
)

実行する

$ airflow run ssh_dag ssh_touch 2019-02-02

リモートホストに foo というファイルができていれば成功。

エラー時の対応

ssh周りは何かしらエラーが出る。下記あたりはよく遭遇した。

paramiko.ssh_exception.SSHException: No existing session

SSHの接続にはparamikoが使われている。

https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/hooks/ssh_hook.py#L164

渡しているパラメータが、下記のような簡易コードでログインできる内容になっていればだいたい通るはず。

import paramiko

client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

client.connect(hostname='host_name,
                           username='user_name',
                           key_filename='key_file',
                           port=port)

SSHHookで接続を指定する

上記はConnectionで指定したが、ssh_connで指定する場合も書いておく。

同一の接続設定でホスト名だけが違う複数サーバーに対して処理を行う場合は、ssh_connで大まかな設定だけしてremote_host等はSSHHookで書き換える方が管理が楽な場合もある。

from airflow.contrib.hooks.ssh_hook import SSHHook

hook = SSHHook(
    ssh_conn_id='ssh_default',
    remote_host='192.168.164.15',
    username='watanabe',
    key_file='/home/masato/.ssh/id_rsa_noren',
    port=7429
)

ssh_touch = SSHOperator(
    task_id='ssh_touch',
    ssh_conn_id='ssh_noren',
    ssh_hook=hook,
    command='touch bar',
    dag=dag
)

改定履歴

Author: Masato Watanabe, Date: 2019-02-12, 記事投稿