iMind Developers Blog

iMind開発者ブログ

Airflowでpythonのコードをspark-submit

概要

AirflowのSparkSubmitOperatorを使ってPySparkのスクリプトファイルをspark-submitで実行する。

バージョン情報

  • Python 3.6.7
  • apache-airflow==1.10.1
  • spark 2.3.1

PySpark側のコード

適当にHDFS上のファイルを読み込んで行数をcountするコードを書いておく。

# tmp配下のファイルを読み込んでカウントするだけのコード
def count():
    import pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()
    record_count = spark.read.format('csv') \
        .load('tmp') \
        .count()
    return record_count

record_count = count()

# 0だったら例外が出るようにしておく
from airflow.exceptions import AirflowException
if record_count == 0:
    raise AirflowException('record not found')

print(record_count)

dagのコード

SparkSubmitOperatorを記述する。

from airflow import DAG, settings
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

import os, datetime, logging

args = {
    'owner': 'masato watanabe',
    'start_date': datetime.datetime(2018, 12, 15)
}

dag = DAG('count_dag', default_args=args, schedule_interval='0 * * * *')

count = SparkSubmitOperator(
    name='count_job',
    application='%s/count.py' % settings.DAGS_FOLDER,
    conn_id='spark_default',
    task_id='count',
    dag=dag
)

SparkSubmitOperatorの引数に指定されているnameはSparkが実行される際のアプリケーション名。

applicationには実行するPythonのコードを指定する。dags_folderの直下に置いてある想定で絶対パスで指定している。

conn_idにはspark_defaultを指定しているがこれはWeb画面の Admin → Connectionsで表示されるページにinitdb時に自動で設定されている値。

spark_defaultではExtraにqueueの名前が指定されている。必要があれば変更する。

SPARK_HOMEを指定したい場合はextraに下記のようにJSON形式で spark-home を記述しておく。

{"spark-home": "/home/user/local/spark-2.3.1"}

その他extraにはspark-binary、deploy_mode、namespaceなどが指定できる。

Pythonのコードに引数を渡す

PySpark側のコードからsys.argvで取れるような値を送りたい場合は application_args にリスト形式で設定する。

試しにExecution Dateの値を --date %Y%m%d --hour %Hの形で整形して送ってみる。

app_args = ['--date', "{{ ds_nodash }}", '--hour', "{{ execution_date.strftime('%H') }}"]
count = SparkSubmitOperator(
    application='{}/count.py'.format(settings.DAGS_FOLDER),
    conn_id='spark_default',
    task_id='count',
    application_args=app_args,
    dag=dag
)

実行するとPySpark側から見た sys.argv は下記のようになる。

 ['/home/user/airflow/dags/count.py', '--date', '20181218', '--hour', '22']

PYSPARK_PYTHONの指定について

PYSPARK_PYTHONやPYSPARK_DRIVER_PYTHONを指定する場合は、env_varsにdictionary形式で指定する。

count = SparkSubmitOperator(
    application='{}/count.py'.format(settings.DAGS_FOLDER),
    conn_id='spark_default',
    task_id='count',
    env_vars = {'PYSPARK_PYTHON': '~/miniconda3/bin/python'},
    dag=dag
)

上記を実行するとspark-submit時に --conf spark.yarn.appMasterEnv.PYSPARK_PYTHONでパラメータが渡される。

['spark-submit',
    '--master', 'yarn',
    '--conf', 'spark.yarn.appMasterEnv.PYSPARK_PYTHON=~/miniconda3/bin/python',
    '--name', 'airflow-spark',
    '/home/user/airflow/dags/count.py']

改定履歴

Author: Masato Watanabe, Date: 2019-2-14, 記事投稿