概要
HBaseに対してがっつりinsertやselectする際はhiveと連携させると楽だった。
バージョン情報
- Hive 1.1.0
- HBase 1.2.0
参考情報
https://cwiki.apache.org/confluence/display/Hive/HBaseIntegration
keyとvalueだけを持つテーブルの生成
今回のサンプル用にnamespaceを作っておく。
$ hbase shell > create_namespace 'test'
hiveからHBaseStorageHandlerを指定してテーブル生成する。
hive側のテーブルはtest.table1。hbase側はtest:table1。
CREATE TABLE test.table1 (`key` string, `value` string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf1:val') TBLPROPERTIES ('hbase.table.name' = 'test:table1', "hbase.mapred.output.outputtable" = 'test:table1');
hiveのテーブルはkeyとvalueをstringで持つ。
この2つのカラムを、3行目でhbase側のkeyとcf1:valというカラムに紐付けている。
hive側のテーブルの内容を確認。
$ hive -e "describe test.table1" key string from deserializer value string from deserializer
hbase側のテーブルの内容を確認。
$ hbase shell <<< "describe 'test:table1'" {NAME => 'cf1', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
hiveの他のテーブルからinsertする形でデータ投入ができる。
insert overwrite table test.table1 ( select foo, bar test.example_table limit 10000 );
これでテーブルにデータが投入される。
hiveから見る場合。
-- countしたり select count(*) from test.table1; -- selectしたり select * from test.table1 limit 10
hbase shellから見る場合。
# countしたり > count 'test:table1' # selectしたり > scan 'test:table1', LIMIT=>10
既存のhbaseのテーブルにhiveのテーブルを繋ぐ
既存のhbaseのテーブルと繋ぐ場合は、EXTERNALキーワードを使う。
test.table2というhiveのテーブルに、上の項で生成したtest:table1を紐付けてみる。
CREATE EXTERNAL TABLE test.table2 (`key` string, `value` string) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf1:val') TBLPROPERTIES ('hbase.table.name' = 'test:table1', "hbase.mapred.output.outputtable" = 'test:table1');
これでtest.table2に対してselectすると、hbaseのtest:table1の情報が取れる。
select * from test.table2 limit 10
大量データを投入する場合
投入するデータ量が多過ぎると下記のようなエラーに遭遇することがある。
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: org.apache.hadoop.hive.serde2.SerDeException: java.lang.IllegalArgumentException: Row length is 0
sparkからhbaseにbulk loadする方法に関する記事を見ても、regionの容量を超えるサイズを投入する場合は事前にsplitしてから投入している。
https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/
実行後にリージョンのデータに偏りが出るので必要に応じて(運用の許す範囲で)splitやcompactをしておく。
$ hbase shell <<< "split 'test:table1'" $ hbase shell <<< "compact 'test:table1'"
hbaseのqualifierをhiveのmapにマッピング
hbaseのqualifierは動的に指定ができるので、qualifierにどういった値が入るか不明確な場合はhive側のキーをmap型にして、mapping対象を cf1: のようにカラムファミリーだけ指定する。
CREATE TABLE test.table3 (`key` string, `value` map<string, string>) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf1:') TBLPROPERTIES ('hbase.table.name' = 'test:table3');
dictionaryを生成しつつinsertします。カラムfooがkeyに、barがqualifierに、barがvalueになります。
group byしてmapに変換する箇所はbrickhouseのcollectを利用しています。
insert overwrite table test.table3 select foo, collect(bar, baz) from test.example_table group by foo limit 100
これでhbaseに動的なqualifierを持つデータを投入することができます。
$ hbase shell <<< "scan 'test:table3', LIMIT => 10"
タイムスタンプをhiveのカラムに
マッピングするhbase側のカラムに :timestamp を指定することでhiveからhbaseのタイムスタンプを参照できる。
CREATE EXTERNAL TABLE test.table4 (`key` string, `value` string, `ts` timestamp) STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ('hbase.columns.mapping' = ':key,cf1:val,:timestamp') TBLPROPERTIES ('hbase.table.name' = 'test:table1');
select * from test.table1 limit 3; foo1 bar1 2019-04-23 10:17:49.074 foo2 bar2 2019-04-23 10:26:48.317 foo3 bar3 2019-04-23 10:26:48.796
改定履歴
Author: Masato Watanabe, Date: 2019-04-24 記事投稿