概要
PySparkで機械学習系の処理をする際にSparseVectorに変換したいケースが多々あるのでそのあたりの処理を確認する。
バージョン情報
- Spark 2.4.3
サンプルデータ
いつもお世話になっているMovieLensのデータで、ユーザーごとの映画のレビューをSparseVectorに変換してみる。
from pyspark.sql import functions as F, types as T # MovieLensのratings.csvを読み込む df = spark.read \ .option('header', 'true') \ .csv('ml-20m/ratings.csv') # Stringになっているのでcastしておく df = df.select( F.col('userId').cast(T.IntegerType()), F.col('movieId').cast(T.IntegerType()), F.col('rating').cast(T.FloatType())) # こういうデータ df.take(3) #=> [Row(userId=1, movieId=2, rating=3.5), #=> Row(userId=1, movieId=29, rating=3.5), #=> Row(userId=1, movieId=32, rating=3.5)]
このデータを { movieId : rating } のSparseVecotrに変換する。
SparseVectorの生成
groupByしてcollect_listしてSparseVecotrに渡す流れでやってみる。
SparseVectorに渡す引数はfeatureの数(今回だとmovieIdsのmax+1)、feature_id, weightの3つ。feature_idはソートされている必要がある。
from pyspark.sql import Row from pyspark.ml.linalg import Vectors # userIdとSparseVectorの入ったRowを返す def createSparseVector(row, max_id): # movieIdsでソートしておく lst = list(zip(row.movieIds, row.ratings)) lst = sorted(lst, key=lambda x: x[0]) # SparseVectorの生成 movieIds = [m for m, r in lst] ratings = [r for m, r in lst] features = Vectors.sparse(max_id, movieIds, ratings) # Rowに変換して返す return Row(userId=row.userId, features=features) # movieIdのmax+1を取る max_id = df.select(F.max('movieId').alias('maxId')).take(1)[0].maxId + 1 #=> 131263 # groupByしてcollect_listして users = df.groupBy('userId') \ .agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings')) \ .rdd.map(lambda row: createSparseVector(row, max_id)) users.take(3) #=> [Row(features=SparseVector(131263,, {1: 4.0, 25: 4.5, 32: 4.0, 47: 4.0, 110: 4.0, ... , userId='100263'), #=> Row(features=SparseVector(131263,, {1: 4.0, 10: 4.0, 19: 1.0, 34: 1.0, 39: 3.0, ... , userId='104603'), #=> Row(features=SparseVector(131263,, {377: 4.5, 497: 3.5, 608: 5.0, 784: 3.0, 832: 4.0, ... , userId='71627')]
これでMLlibとかでfitできるSparseVecotrが生成できた。
文字列からのSparseVectorの生成
SparseVecotrにはparseというメソッドが用意されている。
| parse(s) | Parse string representation back into the SparseVector. | | >>> SparseVector.parse(' (4, [0,1 ],[ 4.0,5.0] )') | SparseVector(4, {0: 4.0, 1: 5.0})
あまり使わなさそうなフォーマットだけど、localでこの形式で作っておいて読み込ませるということはできなくはない。
例えば下記のようにローカルで(Spark上でも良いですが)同フォーマットのファイルを作る
# 手元にpandasとして落とす local_users = df.groupBy('userId') \ .agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings')) \ .toPandas() local_users.head(3) #=> userId movieIds ratings #=> 0 111710 [10, 50, 260, ... [4.0, 4.0, 5.0, ... #=> 1 116194 [1, 2, 3, 4, 5, ,... [3.0, 4.0, 3.0, ... #=> 2 117211 [1, 9, 14, 17, ,... [5.0, 4.0, 4.0, ... # movieIdsのmax+1の取得 max_id = max(local_users.movieIds.apply(max))+1 #=> 131263 # rowを所定の形式に変換する import json def format_row(row, max_id): # ソートしておく lst = list(zip(row.movieIds, row.ratings)) lst = sorted(lst, key=lambda x: x[0]) movieIds = [m for m, r in lst] ratings = [r for m, r in lst] # 出力文字列 return '(%d, %s, %s)' % ( max_id, json.dumps(movieIds), json.dumps(ratings)) # userIdと所定形式のtsvで保存 import gzip with gzip.open('sparse_ratings.tsv.gz', 'wt') as f: # header f.write('userId\tfeatures\n') # rows for idx, row in local_users.iterrows(): f.write('%s\t%s\n' % (row.userId, format_row(row, max_id)))
このローカルファイルを読み込んでSparseVectorにしてみる。
# ローカルファイルをhdfsに置く FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem Path = sc._gateway.jvm.org.apache.hadoop.fs.Path fs = FileSystem.get(sc._jsc.hadoopConfiguration()) fs.copyFromLocalFile(Path('sparse_ratings.tsv.gz'), Path('sparse_ratings.tsv.gz')) # テキストとして読み込む users = spark.read.option("delimiter", "\t") \ .option('header', 'true') \ .csv('sparse_ratings.tsv.gz') users.take(3) #=> [Row(userId='100263', features='(131262, [1, 25, 32, 47, 110, ... #=> Row(userId='104603', features='(131262, [1, 10, 19, 34, 39, ... #=> Row(userId='71627', features='(131262, [377, 497, 608, 784, ... # SparseVectorに変換 users = users.rdd.map( lambda row: Row(userId=row.userId, features=SparseVector.parse(row.features).asML())) \ .toDF() users.take(3) #=> [Row(features=SparseVector(131263, {1: 4.0, 25: 4.5, 32: 4.0, ... , userId='100263'), #=> Row(features=SparseVector(131263, {1: 4.0, 10: 4.0, 19: 1.0, ... , userId='104603'), #=> Row(features=SparseVector(131263, {377: 4.5, 497: 3.5, ... , userId='71627')]
SparseVector.parseした値にasML()を実行している。これはmllib用の形式に変換する処理で、実行しないとUDFを通す時や機械学習系の処理でエラーになることがある。
下記はエラーの一例。
TypeError: cannot serialize SparseVector
AttributeError: 'SparseVector' object has no attribute 'toML'
LibSVM形式からSparseVectorにする
LibSVMでデータを渡せばそのままSparseVectorに変換できる。
上で作ったlocal_usersの出力を少し変えてLibSVM形式で出力する。labelを設定する必要があるので便宜的にすべて0にする。
def format_row(row): features = ' '.join(['%d:%d' % (m, r) for m, r in zip(row.movieIds, row.ratings)]) return '0 %s\n' % features with gzip.open('sparse_ratings.libsvm.gz', 'wt') as f: for idx, row in local_users.iterrows(): f.write(format_row(row))
これをformat('libsvm')で読むとSparseVectorが返る。
# ローカルファイルをhdfsに置く fs.copyFromLocalFile(Path('sparse_ratings.libsvm.gz'), Path('sparse_ratings.libsvm.gz')) # 読み込み vec = spark.read.format('libsvm') \ .option('numFeatures', max_id) \ .load("sparse_ratings.libsvm.gz") vec.take(3) #=> [Row(label=0.0, features=SparseVector(131262, {0: 4.0, 24: 4.0, ... #=> Row(label=0.0, features=SparseVector(131262, {0: 4.0, 9: 4.0, ... #=> Row(label=0.0, features=SparseVector(131262, {376: 4.0, 496: 3.0, ...
SparseVecotrのメソッド
SparseVecotorに用意されているメソッドをいくつか叩いてみる。
まずは1つのVectorを取得
v1 = users.take(2)[0].features v2 = users.take(2)[1].features v1 #=> SparseVector(131262, {2: 3.0, 10: 1.0, 11: 3.0, 17: 2.0, ...
size, indices, valuesで生成する際に指定した引数の値が取れる。
v1.size #=> 131262 v1.indices #=> array([ 2, 10, 11, 17, 19, 21, 25, 32, 34, 36 ... v1.values #=> array([3., 1., 3., 2., 3., 3., 3., 2., 3.,3., 4., 4., 4., ...
numNonzerosでzero以外の値が設定されている要素の数が取れる。
v1.numNonzeros()
#=> 116
squared_distanceで2つのベクトルの距離が計算できる。
v1.squared_distance(v2)
#=> 1955.0
toArrayはndarrayに変換して返す。
v1.toArray()
#=> array([0., 0., 3., ..., 0., 0., 0.])
norm
v1.norm(1) #=> 366.0
dot
v1.dot(v2)
#=> 43.0
添字で値の取得。
# 最初の要素は 2=3.0 v1 #=> SparseVector(131262, {2: 3.0, 10: 1.0, 11: 3.0, v1[2] #=> 3.0
SparseVecotrの値の更新
添字を指定して更新するような機能はないらしい。
v[2] = 5.0 #=> TypeError: 'SparseVector' object does not support item assignment
仕方ないのでtoArrayして値を更新して再度SparseVectorにする。
こんな感じでいけそうだけど、これだとnonzeroの値も記録されてしまうらしい。
import numpy as np # 2の値を5.0に変更 arr = v1.toArray() arr[2] = 5.0 # 再度SparseVectorに indices = np.arange(v1.size) v1_dash = SparseVector(v1.size, indices, arr) #=> SparseVector(131262, {0: 0.0, 1: 0.0, 2: 5.0, 3: 0.0, 4: 0.0, ...
0は除外しておいた方が良さそう。
# 再度SparseVectorに indices = np.arange(v1.size) indices = indices[arr != 0.0] arr = arr[arr != 0.0] v1_dash = SparseVector(max_id, indices, arr) #=> SparseVector(131262, {2: 5.0, 10: 1.0, 11: 3.0, 17: 2.0, 19: 3.0, ... # 元データと添字にズレがないか確認 v1 #=> SparseVector(131262, {2: 3.0, 10: 1.0, 11: 3.0, 17: 2.0, 19: 3.0, ...
上記の例ではindex=2だけが値が書き換えられたSparseVectorを生成している。
one-hotのSparseVecotrの作成
np.onesで必要な数1を用意するだけだけど、よく使うのでコピペできるようサンプルを書いておく。
MovieLensのレーティングを無視してレビューしたかどうかだけを0, 1で持つSparseVectorの生成。
import numpy as np from pyspark.mllib.linalg import SparseVector from pyspark.sql import Row from pyspark.sql import types as T from pyspark.sql import functions as F # userIdとSparseVectorの入ったRowを返す def createSparseVector(row, max_id): features = SparseVector(max_id, sorted(row.movieIds), np.ones(len(row.movieIds))) return Row(userId=row.userId, features=features) # movieIdのmaxを取る max_id = df.select(F.max('movieId').alias('maxId')).take(1)[0].maxId + 1 #=> 131262 # groupByしてcollect_listして users = df.groupBy('userId') \ .agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings')) \ .rdd.map(lambda row: createSparseVector(row, max_id)) \ .toDF() users.take(3) #=> [Row(features=SparseVector(131262, {50: 1.0, 158: 1.0, 160: 1.0, ... #=> Row(features=SparseVector(131262, {356: 1.0, 377: 1.0, 500: 1.0, ... #=> Row(features=SparseVector(131262, {110: 1.0, 112: 1.0, 260: 1.0, ...
学習器に放り込む
最後に、生成したVectorを使って何かしら学習をしてみる。
例としてCats & Dogs(id=4386)を見ている人を正解としてLogisticRegressionするコードでも書いてみる。
ちなみにCats & Dogsは視聴した人の数は2003人。ratingの平均は2.69らしい。全体のratingの平均が3.53なのでけっこう低い。なぜだ、名作じゃないか。
df.where(F.col('movieId') == '4386').count() #=> 2003 df.where(F.col('movieId') == '4386').select(F.avg('rating')).collect() #=> [Row(avg(rating)=2.691962056914628)]
まずはラベルを入れる。Cats & Dogs(id=4386)を見た人を1、それ以外を0でラベルを入れる。
# 視聴=1, 未視聴=0 df = df.withColumn('label', F.when(df.movieId == 4386, 1).otherwise(0)) # 正しく値が入っているか確認(視聴した人が2003件なのでOK) df.groupBy('label').count().collect() #=> [Row(label=1, count=2003), Row(label=0, count=19998260)]
SparseVectorに変換。labelカラムが増えたので今回はUDFで変換してみる。
UDFで扱う場合はtypeにVectorUDTを使う。
from pyspark.ml.linalg import Vectors, VectorUDT # SparseVectorに @F.udf(VectorUDT()) def createSparseVector(movieIds, ratings, max_id): # ソートしておく lst = list(zip(movieIds, ratings)) lst = sorted(lst, key=lambda x: x[0]) # SparseVectorの生成 movieIds = [m for m, r in lst] ratings = [r for m, r in lst] return Vectors.sparse(max_id, movieIds, ratings) # movieIdのmaxを取る max_id = df.select(F.max('movieId').alias('maxId')).take(1)[0].maxId + 1 #=> 131262 # groupByしてcollect_listして users = df.groupBy('userId') \ .agg( F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings'), F.max('label').alias('label')) \ .withColumn('features', createSparseVector(F.col('movieIds'), F.col('ratings'), F.lit(max_id))) \ .select('userId', 'label', 'features') users.take(3) #=> [Row(userId='11888', label=0, features=SparseVector(131262, {2: 3.0, 10: 1.0, 11: 3.0, ... #=> Row(userId='13192', label=0, features=SparseVector(131262, {1: 4.5, 47: 4.0, 110: 4.5, ... #=> Row(userId='13772', label=0, features=SparseVector(131262, {11: 4.0, 17: 4.0, 21: 4.0, ...
続いてid=4386がVectorにいると学習できないので0を投入する。
import numpy as np # id=4386だけ0にするudf @F.udf(VectorUDT()) def remove_4386(vec): # 4386がなければそのまま返す if not 4386 in vec.indices: return vec # 4386があれば arr = vec.toArray() arr[4386] = 0.0 indices = np.arange(vec.size) indices = indices[arr != 0.0] arr = arr[arr != 0.0] return Vectors.sparse(vec.size, indices, arr) users = users.withColumn('features', remove_4386(users.features))
LogisticRegressionの実行。
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel lr = LogisticRegression(maxIter=100, regParam=0.1, elasticNetParam=0.1) model = lr.fit(users.repartition(3))
coefficeintsの中身を確認して相関の高そうなmovieIdの一覧を取得する
coefs = list(zip(model.coefficients.indices, model.coefficients.values)) top_ids = sorted(coefs, key=lambda x: x[1], reverse=True)[0:20]
movielensのmovies.csvを読み込んでタイトルに変換する。
movies = spark.read \ .option('header', 'true') \ .csv('ml-20m/movies.csv') \ .select('movieId', 'title') # top_idsをDataFrameに変換 import pandas as pd pdf = pd.DataFrame(top_ids, columns=['movieId', 'coef']) coefs = spark.createDataFrame(pdf) # 上位のタイトルを表示 coefs.join(movies, coefs.movieId == movies.movieId) \ .select(movies.movieId, movies.title, coefs.coef).collect() #=> [Row(movieId='4368', title='Dr. Dolittle 2 (2001)', coef=0.19463812359872854), #=> Row(movieId='4232', title='Spy Kids (2001)', coef=0.12701569494431736), #=> Row(movieId='5419', title='Scooby-Doo (2002)', coef=0.12042625825004065), #=> Row(movieId='4366', title='Atlantis: The Lost Empire (2001)', coef=0.08595671796088702), #=> Row(movieId='4719', title='Osmosis Jones (2001)', coef=0.07427676977612716), #=> Row(movieId='5480', title='Stuart Little 2 (2002)', coef=0.06691716872768952), #=> Row(movieId='4728', title='Rat Race (2001)', coef=0.06325315416027621), #=> Row(movieId='4890', title='Shallow Hal (2001)', coef=0.05369506968028268), #=> Row(movieId='4990', title='Jimmy Neutron: Boy Genius (2001)', coef=0.05261935153726506), #=> Row(movieId='4700', title='Princess Diaries, The (2001)', coef=0.05097417203787525), #=> Row(movieId='4343', title='Evolution (2001)', coef=0.050362525848393694), #=> Row(movieId='5504', title='Spy Kids 2: The Island of Lost Dreams (2002)', coef=0.04910030091351041), #=> Row(movieId='3483', title='Road to El Dorado, The (2000)', coef=0.04643888177833009), #=> Row(movieId='4701', title='Rush Hour 2 (2001)', coef=0.04580668243210159), #=> Row(movieId='3988', title='How the Grinch Stole Christmas (a.k.a. The Grinch) (2000)', coef=0.04331058084512103), #=> Row(movieId='4270', title='Mummy Returns, The (2001)', coef=0.04213200832506191), #=> Row(movieId='4340', title='Animal, The (2001)', coef=0.03995047452751212), #=> Row(movieId='4519', title='Land Before Time, The (1988)', coef=0.03968070284321782), #=> Row(movieId='4638', title='Jurassic Park III (2001)', coef=0.03844839464140543), #=> Row(movieId='3157', title='Stuart Little (1999)', coef=0.038239553396115775)]
レコメンドに向いてるか微妙な処理の割にはまあそこそこの結果かも。
改定履歴
Author: Masato Watanabe, Date: 2019-09-14, 記事投稿