iMind Developers Blog

iMind開発者ブログ

PySparkでWikipediaのXMLをパースしてword2vec

概要

PySparkを利用して日本語版Wikipediaの全文を取り込んでわかち書きし、word2vecに放り込んでみる。

XMLのパース、わかち書き、word2vec等の全行程をPySpark上で行う。

バージョン情報

  • spark-2.3.1
  • Python 3.5.5
  • Janome==0.3.6

Janomeの動く環境を用意

Spark上で利用する為にcondaでJanome入りのenvを固めておく。(Sparkクラスタ側に任意のライブラリを入れられる環境であればこの工程は必要ない)

参考 - PySparkで任意のライブラリを気軽に入れたい にリンクを貼る

condaでenvを作り、janomeをpipで入れ、zipで固めるというフロー。envの名前はjanome_envとしておく。

下記が実行コマンド。CONDA_HOMEはcondaがインストールされているパスを指定する。WORK_DIRは実行するスクリプトを置くパス。

$ WORK_DIR=~/work
$ CONDA_HOME=~/miniconda3
$ conda create -n janome_env --copy -q python=3.5
$ source activate janome_env
$ pip install janome
$ cd $CONDA_HOME/envs/
$ zip -r $WORK_DIR/janome_env.zip janome_env

これでWORK_DIR配下にjanome_env.zipが生成される。この環境を持ち回して形態素解析等の処理は実行する。

ローカルとクラスタでパス構成が一緒になっていると違いを意識しなくて済むのでシンボリックリンクでPythonのパスを揃えておく。

cd $WORK_DIR
ln -s $CONDA_HOME/envs/janome_env envs/janome_env

WORK_DIRにspark.confという名前で下記内容のファイルを作っておく

spark.pyspark.python ./envs/janome_env/bin/python
spark.pyspark.driver.python ./envs/janome_env/bin/python
spark.yarn.appMasterEnv.PYSPARK_PYTHON ./envs/janome_env/bin/python
spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON ./envs/janome_env/bin/python
spark.executorEnv.PYSPARK_PYTHON ./envs/janome_env/bin/python

spark.ui.showConsoleProgress True
spark.yarn.appMasterEnv.JAVA_TOOL_OPTIONS "-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"
spark.executorEnv.JAVA_TOOL_OPTIONS "-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"

下記のようにspark-submitするイメージ。

$ spark-submit \
    --master yarn \
    --properties-file spark.conf \
    --archives "$HOME/spark/envs/janome_env.zip#envs"
    your_python_file.py

WikipediaのXMLパースの例

SparkでのXMLのパースは spark.read.format('xml') のように記述できる。

当該jarがクラスタに配置されていない場合は下記のようなエラーになる。

java.lang.ClassNotFoundException: Failed to find data source: xml. Please find packages at http://spark.apache.org/third-party-projects.html
(中略)
Caused by: java.lang.ClassNotFoundException: xml.DefaultSource

下記URLなどからspark-xmlのjarを拾ってきて、実行時に --jars でオプション指定することにする。

https://mvnrepository.com/artifact/com.databricks/spark-xml_2.11

パースするデータはWikipediaの全文(jawiki-latest-pages-articles.xml.bz2)を落としてきてhadoop上に配置する。

https://dumps.wikimedia.org/jawiki/latest/

hadoop上に配置するパスは tmp/jawiki-latest-pages-articles.xml.bz2 とする。

$ hadoop fs -put jawiki-latest-pages-articles.xml.bz2 tmp/jawiki-latest-pages-articles.xml.bz2

id, title, textの3要素をXMLから取得する。pageタグ配下に各ページの情報が入っていて、id, titleがpageの直下に、textはrevision配下に入っている。

redirectタグが入っているページは他ページへのリダイレクトページなのでfilterで対象からは外すことにする。

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder.getOrCreate()

src_path = 'tmp/jawiki-latest-pages-articles.xml.bz2'
dest_path = 'tmp/wikipedia_text'

df = spark.read.format('xml') \
    .option("rowTag", "page") \
    .load(src_path) \
    .filter(F.col('redirect').isNull()) \
    .select('id', 'title', F.col('revision')['text']['_VALUE'].alias('text')) \
    .repartition(100) \
    .write.format('csv') \
    .mode('overwrite') \
    .option('header', 'true') \
    .save(dest_path)

spark-submitの記述

$ spark-submit \
    --master yarn \
    --num-executors 25 \
    --jars spark-xml_2.11-0.4.1.jar \
    --properties-file spark.conf \
    --archives "$HOME/spark/envs/janome_env.zip#envs" \
    parse_xml.py

selectした結果として、下記のようなRowが出来上がる。

Row(id=11, title='日本語', text='{{wikt}}\n{{Infobox Language\n|name=日本語 ... (以下略)

dest_pathで指定されたパスにCSVで出力される。

$ hadoop fs -text tmp/wikipedia_text/* | head -2

id,title,text
81210,ファイル:Niseko-st.jpg,"{{NowCommons|File:Niseko-st.jpg}} (以下略)

XMLパース時のpartition数は当該環境では21になったので、今後ファイルサイズが増えることも想定して--num-executor は25としている。

XMLファイルのサイズが2.6GB。partition:21 * blick_size:128MB = xml_file_size:2.6GBということで納得のいく数字。

処理時間はだいたい6分。速いとは言わないけどこれだけ簡易な記述で6分で処理を終わらせてくれるならありがたい。

spark.confで下記の記述を入れているが、これを入れないと環境によっては文字化けする。

spark.yarn.appMasterEnv.JAVA_TOOL_OPTIONS "-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"
spark.executorEnv.JAVA_TOOL_OPTIONS "-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"

Janomeによるテキストのわかち書き

Janomeを使って先程のDataFrameのtextをわかち書きする。

Janomeのtokenizerは生起コストが大きめなのでmapで噛ませる関数では初期化したくない。また引数でtokenizerを渡そうとするとcan't pickleと言われる。

ということでそれなりのサイズにpartitionを分割し、flatMapで行ごとに分割した上でmapPartitionsでわかち書きを実行する。

spark.driver.memoryが2G、repartitionを3000とした場合でスムーズに処理が進行した。

形態素解析のルールとしては、Unicode正規化を噛ませると記号が未知語の名詞に判定される為、記号類はRegexで取り除き、名詞、動詞、形容詞のみ取り出している。

from pyspark.sql import SparkSession, Row, functions as F
spark = SparkSession.builder.getOrCreate()

from janome.tokenizer import Tokenizer
from janome.analyzer import Analyzer
from janome import tokenfilter, charfilter
def wakati(text_list):
    ''' わかち書きしてスペース区切りに変換
    '''
    # analyzerの生成
    char_filters = [charfilter.UnicodeNormalizeCharFilter(), charfilter.RegexReplaceCharFilter('[!-/:-@[-`{-~]', ' ')]
    token_filters = [tokenfilter.POSKeepFilter(['名詞', '動詞', '形容詞']), tokenfilter.LowerCaseFilter()]
    tokenizer = Tokenizer()
    analyzer = Analyzer(char_filters, tokenizer, token_filters)

    # わかち書き
    for text in text_list:
        for line in text.split('\n'):
            base_forms = [token.base_form for token in analyzer.analyze(line)]
            if len(base_forms) > 2:
                yield Row(wakati=' '.join(base_forms))

src_path = 'tmp/wikipedia_text'
dest_path = 'tmp/wikipedia_wakati'

spark.read.format('csv') \
    .option("header","true") \
    .load(src_path) \
    .select('text') \
    .repartition(1000) \
    .filter(F.col('text').isNotNull()) \
    .rdd.flatMap(lambda row: row[0].splitlines()) \
    .repartition(3000) \
    .mapPartitions(wakati) \
    .toDF() \
    .repartition(10) \
    .write.format('csv') \
    .mode('overwrite') \
    .save(dest_path)

spark-submitの記述

$ spark-submit \
    --master yarn \
    --num-executors 25 \
    --jars spark-xml_2.11-0.4.1.jar \
    --properties-file spark.conf \
    --archives "$HOME/spark/envs/janome_env.zip#envs" \
   wakati.py

wiki記法の除去

これでわかち書きまで実行できた。ただXMLのtextの部分はWiki記法で書かれているので邪魔な文字も残ってしまう。

151 ページ ref ニューヨーク 輸送 日数 1 ヶ月 早い なる ダラー ライン スケジュール 厳守 会社 全体 評判 高まる ref group 注 ロバート ダラー ポリシー スケジュール 維持 会社 評判 繋がる 考える いる 船長 スケジュール 厳守 求める 積み込み 予定 貨物 色分け 船舶 通信 使う 入港 情報 行う これら ダラー ライン なる 荷主 側 保険 料 節約 メリット 与える スケジュール 厳守 求める の ダラー ライン 最初 ない ダラー ライン これ 慎重 検討 する 末 運用 する 結果 後 他社 業種 波及 する いく 三浦
william editors revolution in perspective essays on the hungarian soviet republic of 1919
トゥールーズ fc 等 国外 クラブ 興味 示す アモリム 本人 残留 希望 する クラブ 側 入札 拒否 する シーズン 29 試合 2
生年 21 7
出典 明記 date 2013 年 4 月 20 日 土 17 23 utc 出典 皆無 ため
file great lakes svg thumb 250 px 五大 湖
678 ファイル map of arkansas highlighting white county svg 80 px
830 mm 5

これもSpark上で除去してみる。

下記に WikiExtractor.py というWiki記法を整形してくれるモジュールがある。

http://medialab.di.unipi.it/wiki/Wikipedia_Extractor

Python2向けなので落としてきたら2to3でPython3向けに変換。

$ 2to3 -w WikiExtractor.py

global escape_docが邪魔なので下記の記述をしているところを除去する。

if escape_doc:

例として下記の文字列を整形してみる。

id = 3244410
title = '売ります。赤ん坊の靴'
text = """
{{Portal|文学}}
[[File:Classic baby shoes.jpg|thumb|right|小児用の靴にまつわる、英語で6語の「小説」は、フラッシュ・フィクションを体現する作品とされている]]
「'''売ります。赤ん坊の靴。未使用'''」(''For sale: baby shoes, never worn'')は、英語で6単語からなる短編小説の全文である<ref>高遠裕子の訳を使用した {{Cite book |title=未来を発明するためにいまできること スタンフォード大学 集中講義II |author=ティナ・シーリグ |others=高遠裕子(翻訳) |publisher  = CCCメディアハウス |year=2012 |url=https://books.google.co.jp/books?id=pnGhAgAAQBAJ&pg=PT77&dq=%E5%A3%B2%E3%82%8A%E3%81%BE%E3%81%99%E3%80%82%E8%B5%A4%E3%82%93%E5%9D%8A%E3%81%AE%E9%9D%B4%E3%80%82%E6%9C%AA%E4%BD%BF%E7%94%A8&hl=ja&sa=X&ei=xNqbVcDRC5P_8QXnjoHADg&ved=0CDQQ6AEwAA#v=onepage&q=%E5%A3%B2%E3%82%8A%E3%81%BE%E3%81%99%E3%80%82%E8%B5%A4%E3%82%93%E5%9D%8A%E3%81%AE%E9%9D%B4%E3%80%82%E6%9C%AA%E4%BD%BF%E7%94%A8&f=false}}</ref>。このシックスワード・ノベルは、[[フラッシュフィクション]]、サドゥンフィクションの極端な例でもある。[[アーネスト・ヘミングウェイ]]の作品とされることが多いが、ヘミングウェイへの帰属は確かなものではなく、同じような趣向の小話は古くから存在する。

== 背景 ==
ヘミングウェイが作者とされる場合、たいてい他の作家たちとの[[賭け]]の場面がクローズアップされる。1992年にSF作家の[[アーサー・C・クラーク]]がカナダのユーモア作家[[ジョン・ロバート・コロンボ]]([[:en:John Robert Colombo]])にあてた手紙ではこう語られている。友人たちと昼食をとったレストラン(リューホーズともザ・アルゴンキンともいわれている)で、ヘミングウェイは6つの単語で全ストーリーをつくってみせるほうに10ドル、といってテーブルの親になった。賭け金がそろうと、ヘミングウェイはナプキンに「売ります。赤ん坊の靴。未使用」と書いてテーブルに回した。そして彼は賭け金を総取りしたのである<ref name=\"quote\">{{cite web|url=http://quoteinvestigator.com/2013/01/28/baby-shoes/|author=Garson O’Toole|title=For Sale, Baby Shoes, Never Worn|publisher=quoteinvestigator.com|date=January 28, 2013|accessdate=19 April 2013}}</ref>。
"""
from WikiExtractor import Extractor
extractor = Extractor(id, title, text)
print(extractor.clean())

実行結果

「売ります。赤ん坊の靴。未使用」("For sale: baby shoes, never worn")は、英語で6単語からなる短編小説の全文である。このシックスワード・ノベルは、フラッシュフィクション、サドゥンフィクションの極端な例でもある。アーネスト・ヘミングウェイの作品とされることが多いが、ヘミングウェイへの帰属は確かなものではなく、同じような趣向の小話は古くから存在する。

== 背景 ==
ヘミングウェイが作者とされる場合、たいてい他の作家たちとの賭けの場面がクローズアップされる。1992年にSF作家のアーサー・C・クラークがカナダのユーモア作家ジョン・ロバート・コロンボ()にあてた手紙ではこう語られている。友人たちと昼食をとったレストラン(リューホーズともザ・アルゴンキンともいわれている)で、ヘミングウェイは6つの単語で全ストーリーをつくってみせるほうに10ドル、といってテーブルの親になった。賭け金がそろうと、ヘミングウェイはナプキンに「売ります。赤ん坊の靴。未使用」と書いてテーブルに回した。そして彼は賭け金を総取りしたのである。

これをwakatiする前のmapperに挟めば良さそう。

from WikiExtractor import Extractor
def extractWiki(row):
    extractor = Extractor(0, '', row[0])
    return Row(text=extractor.clean())

spark.read.format('csv') \
    .option("header","true") \
    .load(src_path) \
    .select('text') \
    .repartition(1000) \
    .filter(F.col('text').isNotNull()) \
    .rdd \
    .map(extractWiki) \  # ← ここを追加
    .flatMap(lambda row: row[0].splitlines()) \
    .repartition(3000) \
    .mapPartitions(wakati) \
    .toDF() \
    .repartition(10) \
    .

spark-submit時の引数に --py-files WikiExtractor.py を追加して実行

infobox 自動車 スペック 表
セオドア・シャピロ うつ病 力動 的 精神療法 平島 奈津子 監訳 金 剛 出版 2010
鹿児島 県 奄美 地方 奄美 群島 トカラ 列島 カテゴリ 奄美 地方 属する 島 下位 カテゴリ 参照
iupac iupac nomenclature of organic chemistry lt ref gt 以外 拡張 hantzsch widman 命名 法 命名 する れる lt ref gt rule b 1
892 世帯 平成 23 年度
スタッフォードシャー 地方 自治 カテゴリ
000 mildura accessdate 2012 12 20 publisher itf language 英語 lt ref gt lt ref gt cite web url http www itftennis com procircuit tournaments men s tournament info aspx tournamentid 1100025100 title 25
robert e page
芹沢 栄 せり ざわ さかえる 1908 年 英語 学者 英 文学 者 東京教育大学 東京成徳短期大学 名誉 教授

多少は減ったけどまだ余分な文字が残っている。stop wordsも入れておきたいところだけど長くなりそうなので今回はこのへんでやめておく。

実行時間はexecutorが25個の場合で5分程度だった。

word2vec

word2vecの使い方は下記に載っている。

https://spark.apache.org/docs/2.3.1/mllib-feature-extraction.html#word2vec

setNumPartitionsを設定しないとデフォルトの1で実行されてしまうのでそのあたりを設定しつつfitを実行。次元数は128にしてみる。

from pyspark.mllib.feature import Word2Vec, Word2VecModel

inp = sc.textFile("tmp/wikipedia_wakati") \
    .map(lambda row: row.split(" ")) \
    .repartition(25)

w2v = Word2Vec() \
    .setNumPartitions(25) \
    .setNumIterations(25) \
    .setVectorSize(128) 
model = w2v.fit(inp)

実行時間は上記設定で10分弱。

setNumPartitionsについては「Use a small number for accuracy」とされているので増やせば良いというものでもないらしい。どう設定するのがベストか評価が難しい。

モデルが出来上がったらsave。あとloadも実行。

# save
model.save(sc, 'tmp/w2v_model')

# load
model = Word2VecModel.load(sc, 'tmp/w2v_model')

近いベクトルの単語を取得。

for word, sim in model.findSynonyms('山', 5):
    print(word, sim)
麓 0.8781297206878662
里 0.8755488991737366
通 0.8750510811805725
浦 0.8707529306411743
宿 0.8688519597053528
for word, sim in model.findSynonyms('川', 5):
    print(word, sim)
湖 0.8764530420303345
流域 0.8690384030342102
沿い 0.8547694683074951
河口 0.846468448638916
湾 0.8459759950637817

ベクトルの取得

model.transform('山')
DenseVector([1.3310, -4.6801, 3.6742, 1.9274, 7.2783, -3.0701, 1.5047, 5.2106, 7.5130, 3.4534, 9.9888, 8.7048, 1.0413, -6.0614, 3.6898, 5.4026, -1.2689, 8.2182, -9.3869, -1.0365, -7.6032, 1.2929, 1.0305, -2.5400, -1.8125, -7.3704, -1.5790, 3.8059, 4.5154, -1.5701, 9.7890, -6.1342, -1.6022, 9.0422, -6.4013, -1.0385, 2.1858, 1.0760, 7.0732, 3.4539, -1.5528, -3.6129, 7.4910, -1.5653, -5.3181, -1.1181, 4.9400, 9.9995, -1.1634, -1.0074, -5.9381, 1.5782, -6.8239, 1.0323, -6.7240, -3.6017, -7.0258, 7.9946, -8.3069, -1.9975, 5.1958, 5.1920, 2.2189, 1.9459, 3.1895, 6.9407, -3.1032, 1.1938, -3.8011, -7.2929, 4.1778, 5.0121, 7.3525, 8.0797, -1.0205, 3.6648, 4.3719, 1.2205, -1.5703, -5.6045, -9.1311, -5.7558, -5.9756, -7.6636, 3.1071, 5.8404, 5.0067, -1.0795, -1.0766, 2.7960, -4.2586, -4.8911, 1.4459, 4.6650, 2.3903, 1.2649, -5.9828, 1.0435, 3.9749, -4.6251, 7.7964, -1.2658, -1.2182, 8.5407, 7.8602, -1.0556, 9.4896, 6.2735, 4.3614, 2.0158, 6.9199, 6.6302, 1.3828, 1.8609, -5.8743, 9.3305, -1.2831, -6.4018, -9.0192, 9.3971, -8.1959, -2.4789, -8.8077, 1.8544, 1.0583, 2.1535, -2.6844, 1.0583])

ベクトルをローカルファイルに書き出す。

from tqdm import tqdm
import gzip

with gzip.open('vectors.txt.gz', 'wt') as w:
    for word in tqdm(model.getVectors()):
        w.write('{} {}\n'.format(word, ' '.join(map(lambda x: str(x), model.transform(word)))))

改定履歴

Author: Masato Watanabe, Date: 2019-01-27, 記事投稿