iMind Developers Blog

iMind開発者ブログ

pyarrowによるPythonでのHDFS操作

概要

PythonでHDFSの操作をしたかったので、pyarrowに入っているclientを使ってみる。

類似のライブラリは他にもいろいろあるけど、pyarrowはその中でもメンテナンスが活発に行われている方なので安心感がある。

バージョン情報

  • Python 3.7.3
  • pyarrow==0.11.1

導入

$ conda install pyarrow

接続

下記のconnectメソッドを呼び出してHDFSに接続する。

pyarrow.hdfs.HadoopFileSystem.connect(
    host='default',
    port=0,
    user=None,
    kerb_ticket=None,
    driver='libhdfs',
    extra_conf=None)

driverにはデフォルトのlibhdfsか、hdfs3をインストールしていればlibhdfs3を指定することもできる。今回はデフォルトのlibhdfsを利用する。

import pyarrow as pa
fs = pa.hdfs.HadoopFileSystem.connect()
fs.ls('.')

mkdir

mkdirはrecursiveにディレクトリ生成を行う。

fs.mkdir('foo/bar')
fs.ls('foo')
    #=> ['hdfs://localhost/user/masato.watanabe/foo/bar']

upload

localのファイルをhdfsにuploadする。

事前にuploadするファイルを作っておく。

$ echo "foobar" > foo.txt

生成したファイルをopenして、引数1にuploadする先のパス、引数2にstreamを指定してfs.uploadを実行する。

with open('foo.txt', 'rb') as f: 
    fs.upload('foo/bar/foo.txt', f) 
fs.ls('foo/bar')
    => ['hdfs://localhost/user/masato.watanabe/foo/bar/foo.txt']

downlaod

HDFSからlocalにファイルをダウンロードする。

with open('bar.txt', 'wb') as f:
    fs.download('foo/bar/foo.txt', f)

exists / isfile / isdir

fs.exists('foo/bar/foo.txt')
    #=> True

fs.isfile('foo/bar/foo.txt')
    #=> True

fs.isdir('foo/bar/foo.txt')
    #=> False

info / stat

infoはHDFS上のファイルの各種情報が取れる。statはファイルサイズと種別(ファイルかディレクトリか等)のみ取れる。

fs.info('foo/bar/foo.txt')

    #=> {'path': 'hdfs://localhost/user/masato.watanabe/foo/bar/foo.txt',
    #=>  'owner': 'masato.watanabe',
    #=>  'group': 'masato.watanabe',
    #=>  'size': 7,
    #=>  'block_size': 134217728,
    #=>  'last_modified': 1559815440,
    #=>  'last_accessed': 1559815439,
    #=>  'replication': 3,
    #=>  'permissions': 436,
    #=>  'kind': 'file'}
fs.stat('foo/bar/foo.txt')

    #=> {'size': 7, 'kind': 'file'}

cat

fs.cat('foo/bar/foo.txt')
    #=> b'foobar\n'

df / get_capacity / get_space_used

dfはfree space。

capacity = df + space_used になる。

fs.df()
    #=> 3095688357103

fs.get_capacity()
    #=> 6279730399592

fs.get_space_used()
    #=> 3184065361360

walk

配下のパスをrecursiveに見ていく。

戻り値は3つでディレクトリパス、ディレクトリ名、ファイル名が返る。

for dir_path, dir_name, file_name in fs.walk('foo'):
    print(dir_path, dir_name, file_name)

    #=> foo ['bar'] []
    #=> foo/bar [] ['foo.txt']

open

HDFSのファイルをopenする。

with fs.open('foo/bar/foo.txt', 'rb') as f:
    print(f.read())

    #=> b'foobar\n'

read_parquet

pyarrowなのでparquetファイルを読むようの関数も用意されている。

df = fs.read_parquet('parquet_file_path').to_pandas()
df.head()

最近のpandasはread_parquetでHDFSのurlを指定するだけで読める。pandasは日々便利になっていく。

df = pd.read_parquet('hdfs:///user/masato.watanabe/parquet_file_path')

rename / mv

mvはrenameのalias。

fs.rename('foo/bar/foo.txt', 'foo/bar/baz.txt')

改定履歴

Author: Masato Watanabe, Date: 2019-06-08, 記事投稿