概要
AirflowのSSHOperatorで指定のサーバーにsshしてコマンドを実行する。
バージョン情報
- apache-airflow==1.10.2
- Python 3.6.8
SSHOperatorの引数
SSHOperator実行時はこのへんのパラメータを指定する。
parameter | description |
---|---|
ssh_conn_id | ConnectionのID(必須) |
ssh_hook | 指定がなければairflow.contrib.hooks.ssh_hook.SSHHookで生成される |
remote_host | 接続先ホスト |
command | 実行したいコマンド |
基本的にはssh_conn_idのところに必要な設定を書いておけば良い。
.ssh/confの扱い
ssh_hookは、.ssh/confを自動で読み込んでくれるようになっている。
https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/hooks/ssh_hook.py#L131
が、host_nameやportは読み込まない等、全ての設定を読んでくれるわけではないようなので、.ssh/confによる指定には頼らず、地道にConnectionのところに必要な情報を書くのが良さそう。(version 1.10時点)
Connectionの用意
仮に「noren」という名前のhostがあったとする。
Web画面のAdmin → Connectionsを開いて、Createから下記のようにsshの接続設定を記述する。(IPアドレスやポート等は実際に接続可能な値を指定してください)
Extraには利用する秘密鍵を絶対パスでJSON形式で指定している。
リモートホストでtouchを実行するSSHOperator
リモートホストでファイルをtouchするだけの処理を書いてみる。
import datetime from airflow import DAG from airflow.contrib.operators.ssh_operator import SSHOperator args = { 'owner': 'masato watanabe', 'start_date': datetime.datetime(2019, 1, 1) } # DAGの設定。実行頻度は1日1回にしてみる。 dag = DAG('ssh_dag', default_args=args, schedule_interval='0 0 * * *') # SSHOperatorの指定(ssh_conn_idに先程作ったconnectionを指定) ssh_touch = SSHOperator( task_id='ssh_touch', ssh_conn_id='ssh_noren', command='touch foo', dag=dag )
実行する
$ airflow run ssh_dag ssh_touch 2019-02-02
リモートホストに foo というファイルができていれば成功。
エラー時の対応
ssh周りは何かしらエラーが出る。下記あたりはよく遭遇した。
paramiko.ssh_exception.SSHException: No existing session
SSHの接続にはparamikoが使われている。
https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/hooks/ssh_hook.py#L164
渡しているパラメータが、下記のような簡易コードでログインできる内容になっていればだいたい通るはず。
import paramiko client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname='host_name', username='user_name', key_filename='key_file', port=port)
SSHHookで接続を指定する
上記はConnectionで指定したが、SSHHookで指定する場合も書いておく。
同一の接続設定でホスト名だけが違う複数サーバーに対して処理を行う場合は、ssh_connで大まかな設定だけしてremote_host等はSSHHookで書き換える方が管理が楽な場合もある。
from airflow.contrib.hooks.ssh_hook import SSHHook hook = SSHHook( ssh_conn_id='ssh_default', remote_host='192.168.164.15', username='watanabe', key_file='/home/masato/.ssh/id_rsa_noren', port=7429 ) ssh_touch = SSHOperator( task_id='ssh_touch', ssh_conn_id='ssh_noren', ssh_hook=hook, command='touch bar', dag=dag )
改定履歴
Author: Masato Watanabe, Date: 2019-02-12, 記事投稿