iMind Developers Blog

iMind開発者ブログ

PySparkでDataFrameのランダムサンプリング

概要

PySparkでDataFrameから指定の割合や指定のレコード数でランダムサンプリングする。

バージョン情報

  • PySpark 2.4.3

サンプルデータ

1〜9999までの数値を持つDataFrameを用意する。

import pandas as pd
import numpy as np
np.random.seed(0)

# 0〜100までの値がランダムに入った10行のデータを用意
pandas_df = pd.DataFrame(
    np.arange(1, 10000),
    columns=['A'])
pandas_df.head(3)
    #=>   A
    #=>0  1
    #=>1  2
    #=>2  3

# SparkのDataFrameに変換してtmp_exampleに保存
df = spark.createDataFrame(pandas_df)
df.take(3)
    #=> [Row(A=1), Row(A=2), Row(A=3)]

ここからランダムでレコードを取得する

割合でサンプリング

DatFrame.sampleで指定した割合でサンプリングできる。

1%でサンプリングして取得したレコード数を確認。

# 1回目
df.sample(0.01).count()
    #=> 107

# 2回目
df.sample(0.01).count()
    #=> 101

# 3回目
df.sample(0.01).count()
    #=> 98

ランダムなので値がバラつくが、概ね1万の1% = 100の周辺の結果になる。

続いてランダムで取得した上10件を取ってみる。

 df.sample(0.01).take(10)
    #=> [Row(A=35),
    #=>  Row(A=65),
    #=>  Row(A=68),
    #=>  Row(A=276),
    #=>  Row(A=342),
    #=>  Row(A=383),
    #=>  Row(A=400),
    #=>  Row(A=581),
    #=>  Row(A=727),
    #=>  Row(A=772)]

このようにshuffleはされておらず、事前のソート順で並んでいる。

並び順もランダムにしたい場合はサンプリングした後にランダムソートする。

from pyspark.sql import functions as F

df.sample(0.01).orderBy(F.rand()).take(10)
    #=> [Row(A=5061),
    #=>  Row(A=2695),
    #=>  Row(A=4697),
    #=>  Row(A=478),
    #=>  Row(A=1560),
    #=>  Row(A=9586),
    #=>  Row(A=1664),
    #=>  Row(A=503),
    #=>  Row(A=7161),
    #=>  Row(A=2841)]

指定レコード数でサンプリング

上の例では割合でサンプリングしたが、何レコード取得したいか指定したい場合。

指定数をサンプリングする方法としてtakeSampleがある。rddに対して実行し、結果をrddではなくlistとして返す。

# 100件の結果を取得。1つ目の引数の意味はwithReplacement。
lst = df.rdd.takeSample(False, 100)
lst[0:5]
    #=> [Row(A=1386), Row(A=715), Row(A=7559), Row(A=972), Row(A=137)]

# parallelizeで再度DataFrameにしてみる
df2 = spark.sparkContext.parallelize(lst)
df2.count()
    #=> 100

このやり方はちょっと抵抗があるので少し多めにsampleでレコードを取っておいてrandom sortしてlimitでも良いだろうか(sampleしなくてもできるけど)。

# 欲しい数
SAMPLE_COUNT = 100

# 割合をそのものの数の1.2倍くらいにしておく
rate = SAMPLE_COUNT / df.count() * 1.2

# samplingして、randomソートして、指定数を取る
sample_df = df.sample(rate).orderBy(F.rand()).limit(SAMPLE_COUNT)
sample_df.count()
    #=> 100

sample_df.take(10)
    #=> [Row(A=5184),
    #=>  Row(A=7224),
    #=>  Row(A=3529),
    #=>  Row(A=8806),
    #=>  Row(A=4612),
    #=>  Row(A=1396),
    #=>  Row(A=1674),
    #=>  Row(A=1936),
    #=>  Row(A=5302),
    #=>  Row(A=8840)]

割合を決めて複数のDataFrameに分割する

randomSplitはDataFrameを指定した割合で分割する。

下記は1%, 2%, 97%の3つに分割する例。

df1, df2, df3 = df.randomSplit([0.01, 0.02, 0.97])

df1.count()
    #=> 96
df2.count()
    #=> 226
df3.count()
    #=> 9677

df.sampleの時と同じように上から順に確率で分割していくので、実行結果の順序は実行前のDataFrameの内容が保たれている。

df1.take(10)  
    #=> [Row(A=12),
    #=>  Row(A=110),
    #=>  Row(A=161),
    #=>  Row(A=352),
    #=>  Row(A=355),
    #=>  Row(A=374),
    #=>  Row(A=390),
    #=>  Row(A=462),
    #=>  Row(A=554),
    #=>  Row(A=700)]

改定履歴

Author: Masato Watanabe, Date: 2019-09-08, 記事投稿