概要
AirflowのSparkSubmitOperatorを使ってPySparkのスクリプトファイルをspark-submitで実行する。
バージョン情報
- Python 3.6.7
- apache-airflow==1.10.1
- spark 2.3.1
PySpark側のコード
適当にHDFS上のファイルを読み込んで行数をcountするコードを書いておく。
# tmp配下のファイルを読み込んでカウントするだけのコード def count(): import pyspark from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() record_count = spark.read.format('csv') \ .load('tmp') \ .count() return record_count record_count = count() # 0だったら例外が出るようにしておく from airflow.exceptions import AirflowException if record_count == 0: raise AirflowException('record not found') print(record_count)
dagのコード
SparkSubmitOperatorを記述する。
from airflow import DAG, settings from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator import os, datetime, logging args = { 'owner': 'masato watanabe', 'start_date': datetime.datetime(2018, 12, 15) } dag = DAG('count_dag', default_args=args, schedule_interval='0 * * * *') count = SparkSubmitOperator( name='count_job', application='%s/count.py' % settings.DAGS_FOLDER, conn_id='spark_default', task_id='count', dag=dag )
SparkSubmitOperatorの引数に指定されているnameはSparkが実行される際のアプリケーション名。
applicationには実行するPythonのコードを指定する。dags_folderの直下に置いてある想定で絶対パスで指定している。
conn_idにはspark_defaultを指定しているがこれはWeb画面の Admin → Connectionsで表示されるページにinitdb時に自動で設定されている値。
spark_defaultではExtraにqueueの名前が指定されている。必要があれば変更する。
SPARK_HOMEを指定したい場合はextraに下記のようにJSON形式で spark-home を記述しておく。
{"spark-home": "/home/user/local/spark-2.3.1"}
その他extraにはspark-binary、deploy_mode、namespaceなどが指定できる。
Pythonのコードに引数を渡す
PySpark側のコードからsys.argvで取れるような値を送りたい場合は application_args にリスト形式で設定する。
試しにExecution Dateの値を --date %Y%m%d --hour %Hの形で整形して送ってみる。
app_args = ['--date', "{{ ds_nodash }}", '--hour', "{{ execution_date.strftime('%H') }}"] count = SparkSubmitOperator( application='{}/count.py'.format(settings.DAGS_FOLDER), conn_id='spark_default', task_id='count', application_args=app_args, dag=dag )
実行するとPySpark側から見た sys.argv は下記のようになる。
['/home/user/airflow/dags/count.py', '--date', '20181218', '--hour', '22']
PYSPARK_PYTHONの指定について
PYSPARK_PYTHONやPYSPARK_DRIVER_PYTHONを指定する場合は、env_varsにdictionary形式で指定する。
count = SparkSubmitOperator( application='{}/count.py'.format(settings.DAGS_FOLDER), conn_id='spark_default', task_id='count', env_vars = {'PYSPARK_PYTHON': '~/miniconda3/bin/python'}, dag=dag )
上記を実行するとspark-submit時に --conf spark.yarn.appMasterEnv.PYSPARK_PYTHONでパラメータが渡される。
['spark-submit', '--master', 'yarn', '--conf', 'spark.yarn.appMasterEnv.PYSPARK_PYTHON=~/miniconda3/bin/python', '--name', 'airflow-spark', '/home/user/airflow/dags/count.py']
改定履歴
Author: Masato Watanabe, Date: 2019-2-14, 記事投稿