概要
WikipediaのXMLファイルをSpark上で読み込んでみたかったので、SparkでのXMLの読み書きについて学習する。
バージョン情報
- spark-2.3.1
- Python 3.5.5
サンプルデータ
最初からWikipediaのデータを扱うのは怖いので(bzip2で固めて2.6GB)、まずは小さなサンプルデータを使って読み書きさせる。
試しにはてなブックマークのRSSを取り扱ってみる。下記はテクノロジカテゴリの新着RSS。
http://b.hatena.ne.jp/entrylist/it.rss
1つのitemタグの中に記事の情報が入っている構成。下記はitemタグの例(一部省略)。
<item rdf:about="(記事URL)"> <title>(記事タイトル)</title> <link>https://note.mu/ogawa0071/n/n0df53a249de8</link> <description>(description)</description> <dc:date>2018-12-07T14:38:02Z</dc:date> <dc:subject>(subject1)</dc:subject> <dc:subject>(subject2)</dc:subject> <hatena:bookmarkcount>16</hatena:bookmarkcount> </item>
サンプルデータの読み込み
サンプルデータをhadoopに配置する。パスはtmp/it.rssに置かれるものとする。
$ hadoop fs -put it.rss tmp/ $ hadoop fs -ls tmp/it.rss -rw-rw-r-- 3 user group 195661 2018-12-07 16:14 tmp/it.rss
PySparkを立ち上げる。Spark2.3.1の場合、packagesでspark-xmlを指定しないと下記のようなエラーが起こる。
: java.lang.ClassNotFoundException: Failed to find data source: xml. Please find packages at http://spark.apache.org/third-party-projects.html Caused by: java.lang.ClassNotFoundException: xml.DefaultSource
下記のように指定してpysparkを立ち上げる。
$ pyspark --packages com.databricks:spark-xml_2.11:0.4.1
--packagesが使えない環境の場合はjarをクライアント側で用意して--jarsで指定する。
$ wget https://repo1.maven.org/maven2/com/databricks/spark-xml_2.11/0.4.1/spark-xml_2.11-0.4.1.jar $ pyspark --jars spark-xml_2.11-0.4.1.jar
これでXMLが読めるようになった。
読み込む場合はformatでxmlを指定し、optionで読み込むrowTagとして今回のrow dataが入っているitemを指定する。
df = spark.read.format('xml') \ .option("rowTag", "item") \ .load("tmp/it.rss") df.select('_about', 'title', 'dc:date').take(3)
実行結果
[ Row(_about='https://note.mu/ogawa0071/n/n0df53a249de8', title='DMMを退職して、スタートアップを立ち上げました|小川楓太|note', dc:date='2018-12-07T14:38:02Z'), Row(_about='https://note.mu/dex1t/n/n0e96772c2698', title='コードを書きながらデザインする意味と効果 #ProductKitchen|Takaya Deguchi|note', dc:date='2018-12-07T13:09:17Z'), Row(_about='https://japanese.engadget.com/2018/12/07/pioneer/', title='パイオニアが上場廃止\u3000香港系ファンドの完全子会社に - Engadget 日本版', dc:date='2018-12-07T11:39:56Z') ]
attributeに入っていたaboutは_aboutという名前でRowに入っていた。
簡単にXMLからDataFrameを作れて便利。
XMLの出力
サンプルデータからtitle、dc:date、link、hatena:bookmarkcountの4つの要素を取得し、下記のようなXMLを生成したい。
<root> <item> <title>タイトル</title> <date>日時</date> <url>URL</url> <bookmarkcount>ブックマーク数</bookmarkcount> </item> </root>
まずはカラム名を変更する。
from pyspark.sql import functions as F df = df.select( \ 'title', \ F.col('dc:date').alias('date'), \ F.col('link').alias('url'), \ F.col('hatena:bookmarkcount').alias('bookmarkcount'))
続いてXMLの出力。optionでrootTagとrowTagを指定することができる。
df.write.format('xml') \ .option('rootTag', 'root') \ .option('rowTag', 'item') \ .save('items.xml')
出来上がったファイルをHadoop上で確認。
$ hadoop fs -text items.xml/*
<root> <item> <title>パイオニアが上場廃止 香港系ファンドの完全子会社に - Engadget 日本版</title> <date>2018-12-07T11:39:56Z</date> <url>https://japanese.engadget.com/2018/12/07/pioneer/</url> <bookmarkcount>9</bookmarkcount> </item> <中略/> </root>
出力できた。
WikipediaのXMLの読み込み
最後にWikipediaの2.6GB.bz2なXMLを読み込んでみる。
まずはHadoopに配置。
$ hadoop fs -put jawiki-latest-pages-articles.xml.bz2 tmp/jawiki-latest-pages-articles.xml.bz2
続いてXMLの読み込み。
$ pyspark --jars spark-xml_2.11-0.4.1.jar --num-executors 30
rowタグはpageという名前。
df = spark.read.format('xml') \ .option("rowTag", "page") \ .load("tmp/jawiki-latest-pages-articles.xml.bz2")
今回実行した環境ではExcecutorの数は21となった。ブロックサイズが128MBでファイルサイズが2.6GBなので妥当な数字。実行時間はおよそ4分だった。
repartitionとかcacheをしてから内容を確認してみる。
df.select('title').take(5)
実行結果
[Row(title='????????????'), Row(title='????????????'), Row(title='????????????????????????'), Row(title='?????????????????????'), Row(title='?????????????????????'),
・・・化けとるがな
optionのcharsetを指定しても解決しないらしい。(そもそもデフォルトはUTF-8)
# これでは解決しない spark.read.format('xml').option("charset", "UTF-8")
JAVA_TOOL_OPTIONSに-Dfile.encoding=UTF8を指定してすると治るらしいので全体的に加えてみる。
JAVA_TOOL_OPTIONS='-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8' \ pyspark \ --jars spark-xml_2.11-0.4.1.jar \ --conf "spark.yarn.appMasterEnv.JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8" \ --conf "spark.executorEnv.JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"
これで無事取得できた。
[Row(title='Wikipedia:アップロードログ 2004年4月'), Row(title='Wikipedia:削除記録/過去ログ 2002年12月'), Row(title='アンパサンド'), Row(title='Wikipedia:Sandbox'), Row(title='言語')]
毎回指定するのは面倒なのでspark.confというファイルを作って下記を記述しておき、--properties-fileで読み込んでおくことにする。
spark.yarn.appMasterEnv.JAVA_TOOL_OPTIONS "-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8" spark.executorEnv.JAVA_TOOL_OPTIONS "-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"
JAVA_TOOL_OPTIONS='-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8' \ pyspark \ --jars spark-xml_2.11-0.4.1.jar \ --properties-file spark.conf
改定履歴
Author: Masato Watanabe, Date: 2019-01-20, 記事投稿