iMind Developers Blog

iMind開発者ブログ

PySparkで任意のライブラリを気軽に入れたい

概要

PySpark上で任意のPythonライブラリを動かしたい。しかし管理者ではないのでカジュアルにpipで何かを入れたりはできない。

そんな時、Clouderaの下記記事によるとconda createで生成した環境を配布して実行すれば管理者以外でも好きな環境でPythonを動かすことができるらしい。

Sparkクラスタ上で好きなPythonライブラリをCloudera Data Science Workbenchから使う

たいへん便利そうなので試してみる。

バージョン情報

  • spark-2.3.1
  • Python 3.5.5
  • conda 4.5.11

動かしたいコード

ログが入っているhiveテーブルからtimeというカラム(unix timeが入っている)にNumPyのrandomでランダム秒足すという意味のない処理を動かす。

from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

import numpy as np

def map_func(row):
    ''' ランダム秒足す '''
    return row['time'] + np.random.random() * 1000

# 5行だけ取ってmapを噛ませて結果をfooディレクトリに格納する
spark.sql('SELECT time FROM log_table LIMIT 5') \
    .rdd.map(map_func) \
    .saveAsTextFile("foo")

当該Spark環境にはNumPyは入っておらず、そのまま実行すると下記のようなエラーになる。

$ spark-submit your_python_file.py

ImportError: No module named 'numpy'

condaで環境の用意

Anacondaが事前に入っているものとする。

下記のようにconda createで環境を用意する。

$ conda create -n my_pyspark_env --copy -y -q python=3.5 numpy

今回利用する環境ではPythonが3.5を利用している為、python=3.5としている。

これでnumpy入りの環境が出来上がった。追加でライブラリを入れたい場合はsource activateしてからpip。

$ source activate my_pyspark_env
$ pip install pandas

うちの環境ではcondaは$HOMEのminiconda3配下に入っている。

$ which conda
~/miniconda3/bin/conda

この場合、conda createして作った環境は ~/miniconda3/envs 配下にmy_pyspark_envという名前でディレクトリが作られる。

$ ls ~/miniconda3/envs
my_pyspark_env

これをzipで固めてworkディレクトリに置いておく。

$ cd ~/miniconda3/envs
$ mkdir ~/work
$ zip -r ~/work/my_pyspark_env.zip my_pyspark_env

$ ls -lh ~/my_pyspark_env.zip
-rw-r--r-- 1 user group 421M 12月  4 14:16 my_pyspark_env.zip

421MBになった。ちょっと大きいけどSpark環境においてはこれくらいのサイズなら配布しても許容されると思われる。これより大きいfat jarもたまに見かけるし。

上記はnumpyという比較的サイズを喰うライブラリを入れているので大きくなっているが、プレーンな状態であれば149MB程度に収まるようだ。

$ conda create -n my_pyspark_env2 --copy -y -q python=3.5
$ zip -r my_pyspark_env2.zip ~/miniconda3/envs/my_pyspark_env2
$ ls -lh my_pyspark_env2.zip

-rw-r--r-- 1 user group 149M 12月  4 14:16 my_pyspark_env2.zip

独自環境を配布してPySparkを動かす

Sparkではspark-submit時にarchivesを指定することでファイルを分散環境に配布できる。

下記はローカルのmy_pyspark_env.zipという名前のファイルを配布して、my_pysparkというフォルダに展開する、という意味になる。

--archives "my_pyspark_env.zip#my_pyspark"

アーカイブを解凍した際のPythonのパスはmy_pysparkフォルダの下に当該zipを解凍した場合と同じファイル構成になる。

# つまりこういう感じ
$ mkdir my_pyspark
$ mv my_pyspark_env.zip my_pyspark/
$ cd my_pyspark
$ unzip my_pyspark_env.zip

ということで、pythonのパスは ./my_pyspark/my_pyspark_env/bin/python となる。

あとはPYSPARK_PYTHONとPYSPARK_DRIVER_PYTHONに解凍後のパスに指定するか、spark-defaults.confなどに記述すれば配布したPythonを利用するようになる。

面倒がないようにローカルとSpark側のパスが同じになるようにsymlinkを貼っておく。

$ cd ~/work
$ mkdir my_pyspark
$ ln -s ~/miniconda3/envs/my_pyspark_env my_pyspark

--archives、pyspark_python, pyspark_driver_python等を指定しつつ実行。

$ spark-submit \
    --master yarn \
    --archives "my_pyspark_env.zip#my_pyspark" \
    --conf spark.pyspark.python=./my_pyspark/my_pyspark_env/bin/python \
    --conf spark.pyspark.driver.python=./my_pyspark/my_pyspark_env/bin/python \
    your_python_file.py

今回のコードでは saveAsTextFile("foo") しているので、HDFS上のfoobarディレクトリに結果が出ていることを確認する。

$ hadoop fs -text foo/*

1543796425.0570402
1543796798.668103
1543798751.5653195
1543798088.5450292
1543799558.5535486

無事動いたようだ。

confで指定する場合、deploy-modeがclient(デフォルト)の場合はspark.pyspark.pythonとspark.pyspark.driver.pythonに、deploy-modeがclusterの場合はspark.yarn.appMasterEnvで指定する。

confファイルに記述する例。

spark.pyspark.python ./my_pyspark/my_pyspark_env/bin/python
spark.pyspark.driver.python ./my_pyspark/my_pyspark_env/bin/python

spark.yarn.appMasterEnv.PYSPARK_PYTHON ./my_pyspark/my_pyspark_env/bin/python
spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON ./my_pyspark/my_pyspark_env/bin/python

--confで指定する例。

$ spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --archives "my_pyspark_env.zip#my_pyspark" \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./my_pyspark/my_pyspark_env/bin/python \
    --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./my_pyspark/my_pyspark_env/bin/python \
    your_python_file.py

これで気軽に好きなPython環境を利用できる。めでたし。

改定履歴

Author: Masato Watanabe, Date: 2019-01-19, 記事投稿