spark
概要 PySparkで下記のようにソートしてからファイルに保存しようと思った場合。 df.repartition(5).orderBy(df.colA).write ... この記述は出力されるファイル数が5になることを期待しているが、orderByの際にpartition数が変動してしまう為期待した結果には…
概要 PySparkで機械学習系の処理をする際にSparseVectorに変換したいケースが多々あるのでそのあたりの処理を確認する。 バージョン情報 Spark 2.4.3 サンプルデータ いつもお世話になっているMovieLensのデータで、ユーザーごとの映画のレビューをSparseVec…
概要 PySparkでDataFrameから指定の割合や指定のレコード数でランダムサンプリングする。 バージョン情報 PySpark 2.4.3 サンプルデータ 1〜9999までの数値を持つDataFrameを用意する。 import pandas as pd import numpy as np np.random.seed(0) # 0〜100…
概要 PySparkでlistの中に入っている値を使ってOR条件を生成したい。 例えばlistに[('a', 1), ('b', 2)]のようなtupleが入っていて、 (x='a' and y=1) or (x='b' and y=2) のような条件を生成する。 バージョン情報 Spark 2.4.3 サンプルデータ カラムx(char…
概要 PySparkでhiveのテーブルのpartitionから最新のものを取りたかった。 何か良い方法を前に見かけたような気がするのだけど思い出せないのでshow partitionsで取れた文字列をパースする。 バージョン情報 Spark 2.4.3 テーブル生成 partitionカラムとして…
概要 例えばMovieLensのデータで各ユーザーがどの映画を見たかを、movieIdのArrayで持っているテーブルがあったとする。 # userIdと映画のidの配列を持つDataFrame df.take(3) #=> [Row(userId='134990', movieIds=['2793', '743', '2572', '4159', '59369',…
概要 大きいDataFrameをページで区切りながらtoPandas()して保存してみる。 バージョン情報 Spark 2.4.3 サンプルデータの用意 1〜100万までのIDとランダムなスコアを持つDataFrameを用意する。 import pandas as pd import numpy as np np.random.seed(0) #…
概要 Spark2.4から複数のArrayカラムをconcatできるようになったらしいので試してみる。 https://issues.apache.org/jira/browse/SPARK-23736 バージョン情報 Spark 2.4.3 サンプルデータ サンプルデータとして、3つのArrayのカラム(名前はfoo, bar, baz)…
概要 spark-submitした際にコンソールやweb ui、ログなどに表示されるプログレス表示。 ↑ こういうやつ 標準出力をログに落としていたらこれのせいでログサイズがデカくなり過ぎたので、更新頻度を調整してみる。 バージョン情報 Spark 2.4.3 progress周りで…
概要 PySparkでSELECTしたカラムの名前を変更する方法を確認する。 バージョン情報 Spark 2.4.3 サンプルデータ カラムfoo, bar, bazの3つを持つtmp_exampleテーブルを用意。 import pandas as pd import numpy as np np.random.seed(0) # 0〜100までの値が…
概要 Spark SQLでSELECTした値をINSERT OVERWRITEしようとした際にファイルが細かくsplitされてHDFSの容量をムダに喰ってしまうことがある。 Spark SQLではrepartitionヒント文を使うことで分割するファイル数を指定することができる。 バージョン情報 Spark…
概要 PySparkはUDF/UDAFが手軽に書けて便利。 バージョン情報 Python 3.7.3 Spark 2.4.3 サンプルデータ カンマ区切りの文字列が入ったカラムを用意して、それをsplitしたり戻したりといった処理をudfで実行してみる。 サンプルデータとしてA, B, Cの3つのカ…
概要 PySparkで整形したrddをtoDFしようとしたら下記のようなエラーが起きて怒られた。 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling データを確認すると、処理結果の上位がNoneになっていて型の…
概要 PySparkで作成したUDFをPythonやSparkの経験がない人にも使ってもらいたい時は、registerしてSQLで呼び出せるようにするとハードルが下がる。 本稿はpysparkでUDFを書く、spark.udf.registerでUDFを登録する、クエリから呼び出す、といった一連の流れを…
概要 AirflowのSparkSubmitOperatorを使ってPySparkのスクリプトファイルをspark-submitで実行する。 バージョン情報 Python 3.6.7 apache-airflow==1.10.1 spark 2.3.1 PySpark側のコード 適当にHDFS上のファイルを読み込んで行数をcountするコードを書いて…
概要 PySParkで行に0〜nまでの連続する数値を採番したかった。 バージョン情報 spark-2.3.1 Python 3.5.5 サンプルデータ 下記のような2つのカラムを持つCSVファイル(100万行)を利用。 $ gunzip -c foo.csv.gz | head -5 0,0.194617 1,0.184299 2,0.988041…
概要 PySparkでHDFS上のファイルをちょろっと操作したい時の為に、JavaのクラスをPySparkから呼び出してlsしたりwriteしたりrmするサンプルコードをまとめておく。 バージョン情報 spark-2.3.1 Hadoop 2.6.0 Python 3.5.5 FileSystemの取得 まずは必要なJava…
概要 PySparkを利用して日本語版Wikipediaの全文を取り込んでわかち書きし、word2vecに放り込んでみる。 XMLのパース、わかち書き、word2vec等の全行程をPySpark上で行う。 バージョン情報 spark-2.3.1 Python 3.5.5 Janome==0.3.6 Janomeの動く環境を用意 S…
概要 PySparkでローカルの任意のモジュールをクラスタ上で動かしたい場合の指定について確認する。 バージョン情報 spark-2.3.1 Python 3.5.5 ディレクトリ構成 今回利用するディレクトリの構成は下記のような想定。myというディレクトリに独自モジュールが…
概要 WikipediaのXMLファイルをSpark上で読み込んでみたかったので、SparkでのXMLの読み書きについて学習する。 バージョン情報 spark-2.3.1 Python 3.5.5 サンプルデータ 最初からWikipediaのデータを扱うのは怖いので(bzip2で固めて2.6GB)、まずは小さな…
概要 PySpark上で任意のPythonライブラリを動かしたい。しかし管理者ではないのでカジュアルにpipで何かを入れたりはできない。 そんな時、Clouderaの下記記事によるとconda createで生成した環境を配布して実行すれば管理者以外でも好きな環境でPythonを動…