iMind Developers Blog

iMind開発者ブログ

PySparkでSparseVectorの生成と操作

概要

PySparkで機械学習系の処理をする際にSparseVectorに変換したいケースが多々あるのでそのあたりの処理を確認する。

バージョン情報

  • Spark 2.4.3

サンプルデータ

いつもお世話になっているMovieLensのデータで、ユーザーごとの映画のレビューをSparseVectorに変換してみる。

from pyspark.sql import functions as F, types as T

# MovieLensのratings.csvを読み込む
df = spark.read \
    .option('header', 'true') \
    .csv('ml-20m/ratings.csv')

# Stringになっているのでcastしておく
df = df.select(
    F.col('userId').cast(T.IntegerType()),
    F.col('movieId').cast(T.IntegerType()),
    F.col('rating').cast(T.FloatType()))

# こういうデータ
df.take(3)
    #=> [Row(userId=1, movieId=2, rating=3.5),
    #=>  Row(userId=1, movieId=29, rating=3.5),
    #=>  Row(userId=1, movieId=32, rating=3.5)]

このデータを { movieId : rating } のSparseVecotrに変換する。

SparseVectorの生成

groupByしてcollect_listしてSparseVecotrに渡す流れでやってみる。

SparseVectorに渡す引数はfeatureの数(今回だとmovieIdsのmax+1)、feature_id, weightの3つ。feature_idはソートされている必要がある。

from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# userIdとSparseVectorの入ったRowを返す
def createSparseVector(row, max_id):
    # movieIdsでソートしておく
    lst = list(zip(row.movieIds, row.ratings))
    lst = sorted(lst, key=lambda x: x[0])
    # SparseVectorの生成
    movieIds = [m for m, r in lst]
    ratings = [r for m, r in lst]
    features = Vectors.sparse(max_id, movieIds, ratings)
    # Rowに変換して返す
    return Row(userId=row.userId, features=features)

# movieIdのmax+1を取る
max_id = df.select(F.max('movieId').alias('maxId')).take(1)[0].maxId + 1
    #=> 131263

# groupByしてcollect_listして
users = df.groupBy('userId') \
    .agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings')) \
    .rdd.map(lambda row: createSparseVector(row, max_id))

users.take(3)
    #=> [Row(features=SparseVector(131263,, {1: 4.0, 25: 4.5, 32: 4.0, 47: 4.0, 110: 4.0, ... , userId='100263'),
    #=>  Row(features=SparseVector(131263,, {1: 4.0, 10: 4.0, 19: 1.0, 34: 1.0, 39: 3.0, ... , userId='104603'),
    #=>  Row(features=SparseVector(131263,, {377: 4.5, 497: 3.5, 608: 5.0, 784: 3.0, 832: 4.0, ... , userId='71627')]

これでMLlibとかでfitできるSparseVecotrが生成できた。

文字列からのSparseVectorの生成

SparseVecotrにはparseというメソッドが用意されている。

 |  parse(s)
 |      Parse string representation back into the SparseVector.
 |      
 |      >>> SparseVector.parse(' (4, [0,1 ],[ 4.0,5.0] )')
 |      SparseVector(4, {0: 4.0, 1: 5.0})

あまり使わなさそうなフォーマットだけど、localでこの形式で作っておいて読み込ませるということはできなくはない。

例えば下記のようにローカルで(Spark上でも良いですが)同フォーマットのファイルを作る

# 手元にpandasとして落とす
local_users = df.groupBy('userId') \
    .agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings')) \
    .toPandas()

local_users.head(3)
    #=>    userId          movieIds     ratings
    #=> 0  111710  [10, 50, 260, ...  [4.0, 4.0, 5.0, ...
    #=> 1  116194  [1, 2, 3, 4, 5, ,...  [3.0, 4.0, 3.0, ...
    #=> 2  117211  [1, 9, 14, 17, ,...  [5.0, 4.0, 4.0, ...

# movieIdsのmax+1の取得
max_id = max(local_users.movieIds.apply(max))+1
    #=> 131263

# rowを所定の形式に変換する
import json
def format_row(row, max_id):
    # ソートしておく
    lst = list(zip(row.movieIds, row.ratings))
    lst = sorted(lst, key=lambda x: x[0])
    movieIds = [m for m, r in lst]
    ratings = [r for m, r in lst]
    # 出力文字列
    return '(%d, %s, %s)' % (
        max_id,
        json.dumps(movieIds),
        json.dumps(ratings))

# userIdと所定形式のtsvで保存
import gzip
with gzip.open('sparse_ratings.tsv.gz', 'wt') as f:
    # header
    f.write('userId\tfeatures\n')
    # rows
    for idx, row in local_users.iterrows():
        f.write('%s\t%s\n' % (row.userId, format_row(row, max_id)))

このローカルファイルを読み込んでSparseVectorにしてみる。

# ローカルファイルをhdfsに置く
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
fs = FileSystem.get(sc._jsc.hadoopConfiguration())
fs.copyFromLocalFile(Path('sparse_ratings.tsv.gz'), Path('sparse_ratings.tsv.gz'))

# テキストとして読み込む
users = spark.read.option("delimiter", "\t") \
    .option('header', 'true') \
    .csv('sparse_ratings.tsv.gz')
users.take(3)
    #=> [Row(userId='100263', features='(131262, [1, 25, 32, 47, 110, ...
    #=>  Row(userId='104603', features='(131262, [1, 10, 19, 34, 39, ...
    #=>  Row(userId='71627', features='(131262, [377, 497, 608, 784, ...

# SparseVectorに変換
users = users.rdd.map(
    lambda row: Row(userId=row.userId, features=SparseVector.parse(row.features).asML())) \
    .toDF()
users.take(3)
    #=> [Row(features=SparseVector(131263, {1: 4.0, 25: 4.5, 32: 4.0, ... , userId='100263'),
    #=>  Row(features=SparseVector(131263, {1: 4.0, 10: 4.0, 19: 1.0, ... , userId='104603'),
    #=>  Row(features=SparseVector(131263, {377: 4.5, 497: 3.5, ... , userId='71627')]

SparseVector.parseした値にasML()を実行している。これはmllib用の形式に変換する処理で、実行しないとUDFを通す時や機械学習系の処理でエラーになることがある。

下記はエラーの一例。

TypeError: cannot serialize SparseVector
AttributeError: 'SparseVector' object has no attribute 'toML'

LibSVM形式からSparseVectorにする

LibSVMでデータを渡せばそのままSparseVectorに変換できる。

上で作ったlocal_usersの出力を少し変えてLibSVM形式で出力する。labelを設定する必要があるので便宜的にすべて0にする。

def format_row(row):
    features = ' '.join(['%d:%d' % (m, r) for m, r in zip(row.movieIds, row.ratings)]) 
    return '0 %s\n' % features

with gzip.open('sparse_ratings.libsvm.gz', 'wt') as f:
    for idx, row in local_users.iterrows():
        f.write(format_row(row))

これをformat('libsvm')で読むとSparseVectorが返る。

# ローカルファイルをhdfsに置く
fs.copyFromLocalFile(Path('sparse_ratings.libsvm.gz'), Path('sparse_ratings.libsvm.gz'))

# 読み込み
vec = spark.read.format('libsvm') \
     .option('numFeatures', max_id) \
     .load("sparse_ratings.libsvm.gz")
vec.take(3)
    #=> [Row(label=0.0, features=SparseVector(131262, {0: 4.0, 24: 4.0, ...
    #=>  Row(label=0.0, features=SparseVector(131262, {0: 4.0, 9: 4.0, ...
    #=>  Row(label=0.0, features=SparseVector(131262, {376: 4.0, 496: 3.0, ...

SparseVecotrのメソッド

SparseVecotorに用意されているメソッドをいくつか叩いてみる。

まずは1つのVectorを取得

v1 = users.take(2)[0].features
v2 = users.take(2)[1].features
v1
    #=> SparseVector(131262, {2: 3.0, 10: 1.0, 11: 3.0, 17: 2.0, ...

size, indices, valuesで生成する際に指定した引数の値が取れる。

v1.size
    #=> 131262

v1.indices
    #=>  array([ 2, 10, 11, 17, 19, 21, 25, 32, 34, 36 ...

v1.values
    #=> array([3., 1., 3., 2., 3., 3., 3., 2., 3.,3., 4., 4., 4., ...

numNonzerosでzero以外の値が設定されている要素の数が取れる。

v1.numNonzeros()
    #=> 116

squared_distanceで2つのベクトルの距離が計算できる。

v1.squared_distance(v2)
    #=> 1955.0

toArrayはndarrayに変換して返す。

v1.toArray()                                                                                                              
    #=> array([0., 0., 3., ..., 0., 0., 0.])

norm

v1.norm(1)
    #=> 366.0

dot

v1.dot(v2)
    #=> 43.0

添字で値の取得。

# 最初の要素は 2=3.0
v1
    #=>  SparseVector(131262, {2: 3.0, 10: 1.0, 11: 3.0, 

v1[2]
    #=> 3.0

SparseVecotrの値の更新

添字を指定して更新するような機能はないらしい。

v[2] = 5.0
    #=> TypeError: 'SparseVector' object does not support item assignment

仕方ないのでtoArrayして値を更新して再度SparseVectorにする。

こんな感じでいけそうだけど、これだとnonzeroの値も記録されてしまうらしい。

import numpy as np

# 2の値を5.0に変更
arr = v1.toArray()
arr[2] = 5.0

# 再度SparseVectorに
indices = np.arange(v1.size)
v1_dash = SparseVector(v1.size, indices, arr)
    #=> SparseVector(131262, {0: 0.0, 1: 0.0, 2: 5.0, 3: 0.0, 4: 0.0, ...

0は除外しておいた方が良さそう。

# 再度SparseVectorに
indices = np.arange(v1.size)
indices = indices[arr != 0.0]
arr = arr[arr != 0.0]
v1_dash = SparseVector(max_id, indices, arr)
    #=> SparseVector(131262, {2: 5.0, 10: 1.0, 11: 3.0, 17: 2.0, 19: 3.0, ...

# 元データと添字にズレがないか確認
v1
    #=> SparseVector(131262, {2: 3.0, 10: 1.0, 11: 3.0, 17: 2.0, 19: 3.0, ...

上記の例ではindex=2だけが値が書き換えられたSparseVectorを生成している。

one-hotのSparseVecotrの作成

np.onesで必要な数1を用意するだけだけど、よく使うのでコピペできるようサンプルを書いておく。

MovieLensのレーティングを無視してレビューしたかどうかだけを0, 1で持つSparseVectorの生成。

import numpy as np

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row
from pyspark.sql import types as T 
from pyspark.sql import functions as F

# userIdとSparseVectorの入ったRowを返す
def createSparseVector(row, max_id):
    features = SparseVector(max_id, sorted(row.movieIds), np.ones(len(row.movieIds)))
    return Row(userId=row.userId, features=features)

# movieIdのmaxを取る
max_id = df.select(F.max('movieId').alias('maxId')).take(1)[0].maxId + 1
    #=> 131262

# groupByしてcollect_listして
users = df.groupBy('userId') \
    .agg(F.collect_list('movieId').alias('movieIds'), F.collect_list('rating').alias('ratings')) \
    .rdd.map(lambda row: createSparseVector(row, max_id)) \
    .toDF()

users.take(3)
    #=> [Row(features=SparseVector(131262, {50: 1.0, 158: 1.0, 160: 1.0, ...
    #=>  Row(features=SparseVector(131262, {356: 1.0, 377: 1.0, 500: 1.0, ... 
    #=>  Row(features=SparseVector(131262, {110: 1.0, 112: 1.0, 260: 1.0, ...

学習器に放り込む

最後に、生成したVectorを使って何かしら学習をしてみる。

例としてCats & Dogs(id=4386)を見ている人を正解としてLogisticRegressionするコードでも書いてみる。

ちなみにCats & Dogsは視聴した人の数は2003人。ratingの平均は2.69らしい。全体のratingの平均が3.53なのでけっこう低い。なぜだ、名作じゃないか。

df.where(F.col('movieId') == '4386').count()
    #=> 2003

df.where(F.col('movieId') == '4386').select(F.avg('rating')).collect()
    #=> [Row(avg(rating)=2.691962056914628)] 

まずはラベルを入れる。Cats & Dogs(id=4386)を見た人を1、それ以外を0でラベルを入れる。

# 視聴=1, 未視聴=0
df = df.withColumn('label', F.when(df.movieId == 4386, 1).otherwise(0))

# 正しく値が入っているか確認(視聴した人が2003件なのでOK)
df.groupBy('label').count().collect()
    #=> [Row(label=1, count=2003), Row(label=0, count=19998260)]

SparseVectorに変換。labelカラムが増えたので今回はUDFで変換してみる。

UDFで扱う場合はtypeにVectorUDTを使う。

from pyspark.ml.linalg import Vectors, VectorUDT

# SparseVectorに
@F.udf(VectorUDT())
def createSparseVector(movieIds, ratings, max_id):
    # ソートしておく
    lst = list(zip(movieIds, ratings))
    lst = sorted(lst, key=lambda x: x[0])
    # SparseVectorの生成
    movieIds = [m for m, r in lst]
    ratings = [r for m, r in lst]
    return Vectors.sparse(max_id, movieIds, ratings)

# movieIdのmaxを取る
max_id = df.select(F.max('movieId').alias('maxId')).take(1)[0].maxId + 1
    #=> 131262

# groupByしてcollect_listして
users = df.groupBy('userId') \
    .agg(
        F.collect_list('movieId').alias('movieIds'),
        F.collect_list('rating').alias('ratings'),
        F.max('label').alias('label')) \
    .withColumn('features', createSparseVector(F.col('movieIds'), F.col('ratings'), F.lit(max_id))) \
    .select('userId', 'label', 'features')

users.take(3)
    #=> [Row(userId='11888', label=0, features=SparseVector(131262, {2: 3.0, 10: 1.0, 11: 3.0, ...
    #=>  Row(userId='13192', label=0, features=SparseVector(131262, {1: 4.5, 47: 4.0, 110: 4.5, ...
    #=>  Row(userId='13772', label=0, features=SparseVector(131262, {11: 4.0, 17: 4.0, 21: 4.0, ...

続いてid=4386がVectorにいると学習できないので0を投入する。

import numpy as np

# id=4386だけ0にするudf
@F.udf(VectorUDT())
def remove_4386(vec):
    # 4386がなければそのまま返す
    if not 4386 in vec.indices:
        return vec

    # 4386があれば
    arr = vec.toArray()
    arr[4386] = 0.0
    indices = np.arange(vec.size)
    indices = indices[arr != 0.0]
    arr = arr[arr != 0.0]
    return Vectors.sparse(vec.size, indices, arr)

users = users.withColumn('features', remove_4386(users.features))

LogisticRegressionの実行。

from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel

lr = LogisticRegression(maxIter=100, regParam=0.1, elasticNetParam=0.1)
model = lr.fit(users.repartition(3))

coefficeintsの中身を確認して相関の高そうなmovieIdの一覧を取得する

coefs = list(zip(model.coefficients.indices, model.coefficients.values))                                                   
top_ids = sorted(coefs, key=lambda x: x[1], reverse=True)[0:20]

movielensのmovies.csvを読み込んでタイトルに変換する。

movies = spark.read \
    .option('header', 'true') \
    .csv('ml-20m/movies.csv') \
    .select('movieId', 'title')

# top_idsをDataFrameに変換
import pandas as pd
pdf = pd.DataFrame(top_ids, columns=['movieId', 'coef'])
coefs = spark.createDataFrame(pdf)

# 上位のタイトルを表示
coefs.join(movies, coefs.movieId == movies.movieId) \
    .select(movies.movieId, movies.title, coefs.coef).collect()
    #=> [Row(movieId='4368', title='Dr. Dolittle 2 (2001)', coef=0.19463812359872854),
    #=>  Row(movieId='4232', title='Spy Kids (2001)', coef=0.12701569494431736),
    #=>  Row(movieId='5419', title='Scooby-Doo (2002)', coef=0.12042625825004065),
    #=>  Row(movieId='4366', title='Atlantis: The Lost Empire (2001)', coef=0.08595671796088702),
    #=>  Row(movieId='4719', title='Osmosis Jones (2001)', coef=0.07427676977612716),
    #=>  Row(movieId='5480', title='Stuart Little 2 (2002)', coef=0.06691716872768952),
    #=>  Row(movieId='4728', title='Rat Race (2001)', coef=0.06325315416027621),
    #=>  Row(movieId='4890', title='Shallow Hal (2001)', coef=0.05369506968028268),
    #=>  Row(movieId='4990', title='Jimmy Neutron: Boy Genius (2001)', coef=0.05261935153726506),
    #=>  Row(movieId='4700', title='Princess Diaries, The (2001)', coef=0.05097417203787525),
    #=>  Row(movieId='4343', title='Evolution (2001)', coef=0.050362525848393694),
    #=>  Row(movieId='5504', title='Spy Kids 2: The Island of Lost Dreams (2002)', coef=0.04910030091351041),
    #=>  Row(movieId='3483', title='Road to El Dorado, The (2000)', coef=0.04643888177833009),
    #=>  Row(movieId='4701', title='Rush Hour 2 (2001)', coef=0.04580668243210159),
    #=>  Row(movieId='3988', title='How the Grinch Stole Christmas (a.k.a. The Grinch) (2000)', coef=0.04331058084512103),
    #=>  Row(movieId='4270', title='Mummy Returns, The (2001)', coef=0.04213200832506191),
    #=>  Row(movieId='4340', title='Animal, The (2001)', coef=0.03995047452751212),
    #=>  Row(movieId='4519', title='Land Before Time, The (1988)', coef=0.03968070284321782),
    #=>  Row(movieId='4638', title='Jurassic Park III (2001)', coef=0.03844839464140543),
    #=>  Row(movieId='3157', title='Stuart Little (1999)', coef=0.038239553396115775)]

レコメンドに向いてるか微妙な処理の割にはまあそこそこの結果かも。

改定履歴

Author: Masato Watanabe, Date: 2019-09-14, 記事投稿