概要
PySparkでHDFS上のファイルをちょろっと操作したい時の為に、JavaのクラスをPySparkから呼び出してlsしたりwriteしたりrmするサンプルコードをまとめておく。
バージョン情報
- spark-2.3.1
- Hadoop 2.6.0
- Python 3.5.5
FileSystemの取得
まずは必要なJavaのクラスの定義。
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
hadoopのConfigurationはSparkContext内に入っているのでそれを使ってFileSystemを作る。
fs = FileSystem.get(sc._jsc.hadoopConfiguration())
フォルダの生成
FileSystemが取れたのでまずはフォルダの生成。
fs.mkdirs(Path('foo'))
ファイルの生成
ファイルを書き込む。
stream = fs.create(Path("foo/bar.txt")) try: stream.write(b'foobar') finally: stream.close()
ls
上の2項でfor/bar.txtというファイルができているはずなので、fooディレクトリを参照してbar.txtがあることを確認する。
for status in fs.listStatus(Path('foo')): print(status.getPath().getName())
実行結果
bar.txt
上記はファイルパスのみ取っているが、ファイルサイズ、更新日等、その他の情報についても取ってみる。
for status in fs.listStatus(Path('foo')): print('ファイルパス : %s' % status.getPath().getName()) print('ファイルサイズ : %d' % status.getLen()) print('アクセス日時 : %d' % status.getAccessTime()) print('更新日時 : %d' % status.getModificationTime()) print('ディレクトリ? : %r' % status.isDirectory()) print('ファイル? : %r' % status.isFile()) print('シンボリックリンク? : %r' % status.isSymlink())
実行結果
ファイルパス : bar.txt ファイルサイズ : 6 アクセス日時 : 1544783399370 更新日時 : 1544783399459 ディレクトリ? : False ファイル? : True シンボリックリンク? : False
exists
ファイルの存在チェック。
fs.exists(Path('foo/bar.txt'))
touch
空ファイルの生成。
fs.createNewFile(Path('foo/baz.txt')) # 生成確認 for status in fs.listStatus(Path('foo')): print(status.getPath().getName())
実行結果
bar.txt baz.txt
rm
ファイルの削除。
fs.delete(Path('foo/baz.txt')) # 消えたかどうか確認 for status in fs.listStatus(Path('foo')): print(status.getPath().getName())
実行結果(baz.txtが消えてる)
bar.txt
ローカルへのcopy
HDFS上のファイルをローカルにコピーする。
fs.copyToLocalFile(Path('foo/bar.txt'), Path('bar.txt'))
ローカルからHDFSへのcopy
ローカルのファイルをHDFSにコピーする。
fs.copyFromLocalFile(Path('bar.txt'), Path('foo/bar2.txt'))
rename
fs.rename(Path('foo/baz.txt'), Path('foo/baz2.txt'))
改定履歴
Author: Masato Watanabe, Date: 2019-01-31, 記事投稿