概要
PySparkでローカルの任意のモジュールをクラスタ上で動かしたい場合の指定について確認する。
バージョン情報
- spark-2.3.1
- Python 3.5.5
ディレクトリ構成
今回利用するディレクトリの構成は下記のような想定。myというディレクトリに独自モジュールがいくつか入っていて、この中のコードがpysparkのクラスタ上で動くことになる。
├── main.py └── my ├── script1.py ├── script2.py └── script3.py
操作するテーブル
本稿の処理はtime, url, os, status_codeという4つのカラムが入っているアクセスログテーブルを操作するものとします。
カラム名 | 型 | 意味 |
---|---|---|
time | long | アクセス時のunix timestamp |
url | string | リクエストされたページのURL |
os | string | user agentから取ったOS文字列 |
status_code | int | HTTPのstatus code |
動かない例
まず my/script1.py にはOSがiPhoneだったらhogeに変換してしまうコードが入っているとします。
# my/script1.py def iphone2hoge(row): ''' osカラムの内容がiPhoneで始まる時はhogeに変換する ''' dic = row.asDict() if dic['os'].startswith('iPhone'): dic['os'] = 'hoge' return pyspark.sql.types.Row(**dic)
これをmain.pyから呼び出す。
from my import script1 from pyspark.sql import SparkSession spark = SparkSession.builder.enableHiveSupport().getOrCreate() df = spark.sql('select time, os from log_table limit 10') \ .rdd.map(script1.iphone2hoge) \ # ここでscript1を呼び出している .collect() print(df)
これを呼び出すと、sparkクラスタ上には my.script1 は置かれていないので map(script1.iphone2hoge)するところでエラーになる。
$ spark-submit \ --master yarn \ main.py
ImportError: No module named 'my'
--py-files でscriptを配布する
spark-submitする際に--py-filesを指定すると、任意のスクリプトを配布することができる。
$ spark-submit \ --master yarn \ --py-files my/script1.py \ main.py
これで動く・・・わけではなかった。
ImportError: No module named 'my'
--py-filesで指定した場合、ディレクトリは配布されないので、クラスタ上からは my/script1.py ではなく直下に script1.py がある形に見えるようだ。
下記のようにimport script1とすれば動作する。
from my import script1 ↓ import script1
下記は実行結果を整形したもの。ちゃんとosがhogeに変換されている。
[ Row(os='hoge', time=1543818115), Row(os='Android', time=1543816877), Row(os='hoge', time=1543819588), Row(os='Android', time=1543818941), Row(os='hoge', time=1543818356), Row(os='Android', time=1543819887), Row(os='hoge', time=1543819879), Row(os='Mac', time=1543817962), Row(os='hoge', time=1543819052), Row(os='Android', time=1543819118) ]
--py-files にzipを渡す
ディレクトリをzipで固めて引数で渡すこともできる。
$ zip -r my.zip my
これでspark-submit時に my.zip を指定する。
$ spark-submit \ --master yarn \ --py-files my.zip \ main.py
この場合は my/script1.py に展開されるので from my import script1 でimportすることになる。
sc.addPyFile で追加する
--py-files 指定をする場合は起動スクリプト側でリソースを管理することになる。
ソース側で指定する場合はsparkContextでaddPyFileする。
from pyspark.sql import SparkSession spark = SparkSession.builder.enableHiveSupport().getOrCreate() sc = spark.sparkContext sc.addPyFile('my/script1.py') import script1 # --py-filesの時と同じく同階層でimport # do something
addPyFileにはzipを指定することもできる。
sc.addPyFile('my.zip') from my import script1 # この場合はmy配下になる
これで--py-filesを指定しなくても独自モジュールを呼び出すことができる。
jupyter上で利用する場合
jupyterでも同様の動きができる。下記はpysparkコマンドに--py-filesを指定する場合。
$ pyspark --py-files my/script1.py
importできる。
import script1 df = spark.sql('select time, os from log_table limit 10') \ .rdd.map(script1.hoge_map) \ .collect()
Jupyterで実行中にaddPyFileで追加することも可能。
まずは何も追加せずに実行してみてエラーにする。
from my import script1 df = spark.sql('select time, os from log_table limit 10') \ .rdd.map(script1.hoge_map) \ .collect()
実行結果
ImportError: No module named 'my'
エラーになった後にzipファイルをaddPyFileしてみると正常に動作する。
sc.addPyFile('my.zip') df = spark.sql('select time, os from log_table limit 10') \ .rdd.map(script1.hoge_map) \ .collect()
改定履歴
Author: Masato Watanabe, Date: 2019-01-21, 記事投稿