iMind Developers Blog

iMind開発者ブログ

PySparkで独自モジュールを呼び出す

概要

PySparkでローカルの任意のモジュールをクラスタ上で動かしたい場合の指定について確認する。

バージョン情報

  • spark-2.3.1
  • Python 3.5.5

ディレクトリ構成

今回利用するディレクトリの構成は下記のような想定。myというディレクトリに独自モジュールがいくつか入っていて、この中のコードがpysparkのクラスタ上で動くことになる。

├── main.py
└── my
    ├── script1.py
    ├── script2.py
    └── script3.py

操作するテーブル

本稿の処理はtime, url, os, status_codeという4つのカラムが入っているアクセスログテーブルを操作するものとします。

カラム名 意味
time long アクセス時のunix timestamp
url string リクエストされたページのURL
os string user agentから取ったOS文字列
status_code int HTTPのstatus code

動かない例

まず my/script1.py にはOSがiPhoneだったらhogeに変換してしまうコードが入っているとします。

# my/script1.py 

def iphone2hoge(row):
    ''' osカラムの内容がiPhoneで始まる時はhogeに変換する '''
    dic = row.asDict()
    if dic['os'].startswith('iPhone'):
        dic['os'] = 'hoge'
    return pyspark.sql.types.Row(**dic) 

これをmain.pyから呼び出す。

from my import script1
from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
df = spark.sql('select time, os from log_table limit 10') \
    .rdd.map(script1.iphone2hoge) \ # ここでscript1を呼び出している
    .collect()
print(df)

これを呼び出すと、sparkクラスタ上には my.script1 は置かれていないので map(script1.iphone2hoge)するところでエラーになる。

$ spark-submit \
    --master yarn \
    main.py 
ImportError: No module named 'my'

--py-files でscriptを配布する

spark-submitする際に--py-filesを指定すると、任意のスクリプトを配布することができる。

$ spark-submit \
    --master yarn \
    --py-files my/script1.py \
    main.py 

これで動く・・・わけではなかった。

ImportError: No module named 'my'

--py-filesで指定した場合、ディレクトリは配布されないので、クラスタ上からは my/script1.py ではなく直下に script1.py がある形に見えるようだ。

下記のようにimport script1とすれば動作する。

from my import script1
  ↓
import script1

下記は実行結果を整形したもの。ちゃんとosがhogeに変換されている。

[
  Row(os='hoge', time=1543818115),
  Row(os='Android', time=1543816877),
  Row(os='hoge', time=1543819588),
  Row(os='Android', time=1543818941),
  Row(os='hoge', time=1543818356),
  Row(os='Android', time=1543819887),
  Row(os='hoge', time=1543819879),
  Row(os='Mac', time=1543817962),
  Row(os='hoge', time=1543819052),
  Row(os='Android', time=1543819118)
]

--py-files にzipを渡す

ディレクトリをzipで固めて引数で渡すこともできる。

$ zip -r my.zip my

これでspark-submit時に my.zip を指定する。

$ spark-submit \
    --master yarn \
    --py-files my.zip \
    main.py 

この場合は my/script1.py に展開されるので from my import script1 でimportすることになる。

sc.addPyFile で追加する

--py-files 指定をする場合は起動スクリプト側でリソースを管理することになる。

ソース側で指定する場合はsparkContextでaddPyFileする。

from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
sc = spark.sparkContext

sc.addPyFile('my/script1.py')
import script1 # --py-filesの時と同じく同階層でimport

# do something

addPyFileにはzipを指定することもできる。

sc.addPyFile('my.zip')
from my import script1 # この場合はmy配下になる

これで--py-filesを指定しなくても独自モジュールを呼び出すことができる。

jupyter上で利用する場合

jupyterでも同様の動きができる。下記はpysparkコマンドに--py-filesを指定する場合。

$ pyspark --py-files my/script1.py

importできる。

import script1
df = spark.sql('select time, os from log_table limit 10') \
    .rdd.map(script1.hoge_map) \
    .collect()

Jupyterで実行中にaddPyFileで追加することも可能。

まずは何も追加せずに実行してみてエラーにする。

from my import script1
df = spark.sql('select time, os from log_table limit 10') \
    .rdd.map(script1.hoge_map) \
    .collect()

実行結果

ImportError: No module named 'my'

エラーになった後にzipファイルをaddPyFileしてみると正常に動作する。

sc.addPyFile('my.zip')
df = spark.sql('select time, os from log_table limit 10') \
    .rdd.map(script1.hoge_map) \
    .collect()

改定履歴

Author: Masato Watanabe, Date: 2019-01-21, 記事投稿