iMind Developers Blog

iMind開発者ブログ

PySparkでXMLファイルの読み書き

概要

WikipediaのXMLファイルをSpark上で読み込んでみたかったので、SparkでのXMLの読み書きについて学習する。

バージョン情報

  • spark-2.3.1
  • Python 3.5.5

サンプルデータ

最初からWikipediaのデータを扱うのは怖いので(bzip2で固めて2.6GB)、まずは小さなサンプルデータを使って読み書きさせる。

試しにはてなブックマークのRSSを取り扱ってみる。下記はテクノロジカテゴリの新着RSS。

http://b.hatena.ne.jp/entrylist/it.rss

1つのitemタグの中に記事の情報が入っている構成。下記はitemタグの例(一部省略)。

<item rdf:about="(記事URL)">
  <title>(記事タイトル)</title>
  <link>https://note.mu/ogawa0071/n/n0df53a249de8</link>
  <description>(description)</description>
  <dc:date>2018-12-07T14:38:02Z</dc:date>
  <dc:subject>(subject1)</dc:subject>
  <dc:subject>(subject2)</dc:subject>
  <hatena:bookmarkcount>16</hatena:bookmarkcount>
</item>

サンプルデータの読み込み

サンプルデータをhadoopに配置する。パスはtmp/it.rssに置かれるものとする。

$ hadoop fs -put it.rss tmp/
$ hadoop fs -ls tmp/it.rss
-rw-rw-r--   3 user group     195661 2018-12-07 16:14 tmp/it.rss

PySparkを立ち上げる。Spark2.3.1の場合、packagesでspark-xmlを指定しないと下記のようなエラーが起こる。

: java.lang.ClassNotFoundException: Failed to find data source: xml. Please find packages at http://spark.apache.org/third-party-projects.html

Caused by: java.lang.ClassNotFoundException: xml.DefaultSource

下記のように指定してpysparkを立ち上げる。

$ pyspark --packages com.databricks:spark-xml_2.11:0.4.1

--packagesが使えない環境の場合はjarをクライアント側で用意して--jarsで指定する。

$ wget https://repo1.maven.org/maven2/com/databricks/spark-xml_2.11/0.4.1/spark-xml_2.11-0.4.1.jar

$ pyspark --jars spark-xml_2.11-0.4.1.jar

これでXMLが読めるようになった。

読み込む場合はformatでxmlを指定し、optionで読み込むrowTagとして今回のrow dataが入っているitemを指定する。

df = spark.read.format('xml') \
    .option("rowTag", "item") \
    .load("tmp/it.rss")
df.select('_about', 'title', 'dc:date').take(3)

実行結果

[
  Row(_about='https://note.mu/ogawa0071/n/n0df53a249de8', title='DMMを退職して、スタートアップを立ち上げました|小川楓太|note', dc:date='2018-12-07T14:38:02Z'),
  Row(_about='https://note.mu/dex1t/n/n0e96772c2698', title='コードを書きながらデザインする意味と効果 #ProductKitchen|Takaya Deguchi|note', dc:date='2018-12-07T13:09:17Z'),
  Row(_about='https://japanese.engadget.com/2018/12/07/pioneer/', title='パイオニアが上場廃止\u3000香港系ファンドの完全子会社に - Engadget 日本版', dc:date='2018-12-07T11:39:56Z')
]

attributeに入っていたaboutは_aboutという名前でRowに入っていた。

簡単にXMLからDataFrameを作れて便利。

XMLの出力

サンプルデータからtitle、dc:date、link、hatena:bookmarkcountの4つの要素を取得し、下記のようなXMLを生成したい。

<root>
  <item>
    <title>タイトル</title>
    <date>日時</date>
    <url>URL</url>
    <bookmarkcount>ブックマーク数</bookmarkcount>
  </item>
</root>

まずはカラム名を変更する。

from pyspark.sql import functions as F

df = df.select( \
    'title', \
    F.col('dc:date').alias('date'), \
    F.col('link').alias('url'), \
    F.col('hatena:bookmarkcount').alias('bookmarkcount'))

続いてXMLの出力。optionでrootTagとrowTagを指定することができる。

df.write.format('xml') \
    .option('rootTag', 'root') \
    .option('rowTag', 'item') \
    .save('items.xml')

出来上がったファイルをHadoop上で確認。

$ hadoop fs -text items.xml/*
<root>
    <item>
        <title>パイオニアが上場廃止 香港系ファンドの完全子会社に - Engadget 日本版</title>
        <date>2018-12-07T11:39:56Z</date>
        <url>https://japanese.engadget.com/2018/12/07/pioneer/</url>
        <bookmarkcount>9</bookmarkcount>
    </item>
    <中略/>
</root>

出力できた。

WikipediaのXMLの読み込み

最後にWikipediaの2.6GB.bz2なXMLを読み込んでみる。

まずはHadoopに配置。

$ hadoop fs -put jawiki-latest-pages-articles.xml.bz2 tmp/jawiki-latest-pages-articles.xml.bz2

続いてXMLの読み込み。

$ pyspark --jars spark-xml_2.11-0.4.1.jar --num-executors 30

rowタグはpageという名前。

df = spark.read.format('xml') \
    .option("rowTag", "page") \
    .load("tmp/jawiki-latest-pages-articles.xml.bz2")

今回実行した環境ではExcecutorの数は21となった。ブロックサイズが128MBでファイルサイズが2.6GBなので妥当な数字。実行時間はおよそ4分だった。

repartitionとかcacheをしてから内容を確認してみる。

df.select('title').take(5)

実行結果

[Row(title='????????????'),
 Row(title='????????????'),
 Row(title='????????????????????????'),
 Row(title='?????????????????????'),
 Row(title='?????????????????????'),

・・・化けとるがな

optionのcharsetを指定しても解決しないらしい。(そもそもデフォルトはUTF-8)

# これでは解決しない
spark.read.format('xml').option("charset", "UTF-8")

JAVA_TOOL_OPTIONSに-Dfile.encoding=UTF8を指定してすると治るらしいので全体的に加えてみる。

JAVA_TOOL_OPTIONS='-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8' \
    pyspark \
    --jars spark-xml_2.11-0.4.1.jar \
    --conf "spark.yarn.appMasterEnv.JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8" \
    --conf "spark.executorEnv.JAVA_TOOL_OPTIONS=-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"

これで無事取得できた。

[Row(title='Wikipedia:アップロードログ 2004年4月'),
 Row(title='Wikipedia:削除記録/過去ログ 2002年12月'),
 Row(title='アンパサンド'),
 Row(title='Wikipedia:Sandbox'),
 Row(title='言語')]

毎回指定するのは面倒なのでspark.confというファイルを作って下記を記述しておき、--properties-fileで読み込んでおくことにする。

spark.yarn.appMasterEnv.JAVA_TOOL_OPTIONS "-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"
spark.executorEnv.JAVA_TOOL_OPTIONS "-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8"
JAVA_TOOL_OPTIONS='-Dfile.encoding=UTF8 -Dsun.jnu.encoding=UTF-8' \
    pyspark \
    --jars spark-xml_2.11-0.4.1.jar \
    --properties-file spark.conf

改定履歴

Author: Masato Watanabe, Date: 2019-01-20, 記事投稿