概要
Pythonを利用してBigQueryのデータの追加、削除、検索を行う。
データはpandasで用意して投入する想定。
バージョン情報
- Python 3.7.4
- google-cloud-bigquery==1.21.0
導入
$ pip install google-cloud-bigquery
BigQueryとCloud Storageが許諾されているサービスアカウントを用いて実行する想定。gcloudは既に入っているものとする。
参考サイト
https://googleapis.dev/python/bigquery/latest/index.html
サンプルデータ
MovieLensに含まれるmovies.csvをサンプルデータとして利用する。
https://grouplens.org/datasets/movielens/
import pandas as pd # movies.csvの読み込み df = pd.read_csv('movies.csv') # genresを配列に変換 df['genres'] = df.genres.apply(lambda g: g.split('|')) df.head(3) #=> movieId title genres #=> 0 1 Toy Story (1995) [Adventure, Animation, Children, Comedy, Fantasy] #=> 1 2 Jumanji (1995) [Adventure, Children, Fantasy] #=> 2 3 Grumpier Old Men (1995) [Comedy, Romance]
サービスアカウントでのBigQueryへの接続
jsonファイルを指定してclientを生成する。
from google.cloud import bigquery KEY_PATH = 'キーファイルのパス' bq_client = bigquery.Client.from_service_account_json(KEY_PATH)
authを自前で書く場合は下記。
from google.cloud import bigquery from google.oauth2 import service_account KEY_PATH = 'キーファイルのパス' # auth認証 credentials = service_account.Credentials.from_service_account_file(KEY_PATH) # clientの生成 bq_client = bigquery.Client(credentials=credentials)
データセットの生成
exampleという名前でデータセットを生成してみる。
bq_client.create_dataset('example')
生成されたか確認。
for ds in bq_client.list_datasets(): print(ds.dataset_id) #=> example
テーブルの生成
サンプルデータの形式に合わせてテーブルのスキーマを指定してcreate_tableする。
ArrayのカラムはmodeをREPEATEDにする。
# スキーマ schema = [ bigquery.SchemaField("movieId", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("title", "STRING", mode="REQUIRED"), bigquery.SchemaField("genres", "STRING", mode="REPEATED"), ] # table_idを生成(形式 : project_name.dataset_id.table_name) dataset = bq_client.get_dataset('example') table_id = '%s.%s.movies' % (dataset.project, dataset.dataset_id) # テーブル生成 table = bigquery.Table(table_id, schema=schema) bq_client.create_table(table)
pandasからのデータの投入
pandasのDataFrameからそのままデータを投入する。
# Tableを取得 table = bigquery.Table(table_id, schema=schema) # DataFrameを投入 bq_client.insert_rows_from_dataframe(table, df)
テーブルの削除
table = dataset.table('movies')
bq_client.delete_table(table)
jsonデータの投入
jsonでデータを投入してみる。
まずはDataFrameをjsonに変換。行ごとにjson化されたファイルを作ります。
with open('movies.json', 'wt') as fp: for idx, row in df.iterrows(): fp.write('{}\n'.format(row.to_json()))
$ head movies.json {"movieId":1,"title":"Toy Story (1995)","genres":["Adventure","Animation","Children","Comedy","Fantasy"]} {"movieId":2,"title":"Jumanji (1995)","genres":["Adventure","Children","Fantasy"]} {"movieId":3,"title":"Grumpier Old Men (1995)","genres":["Comedy","Romance"]}
これをcloud storageにアップロード。
from google.cloud import storage KEY_FILE = 'cloud storageを触れるキーファイル' BUCKET_NAME = 'ファイルを置くバケット' # clientの生成 storage_client = storage.Client.from_service_account_json(KEY_FILE) # bucketの取得 bucket = storage_client.get_bucket(BUCKET_NAME) # ファイルのアップロード blob = bucket.blob('example/movies.json') blob.upload_from_filename('movies.json')
これでjsonファイルが example/movies.json というパスでアップロードされた。
置いたデータをBigQueryに投入する。
job_config = bigquery.LoadJobConfig() job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON uri = "gs://%s/example/movies.json" % BUCKET_NAME load_job = bq_client.load_table_from_uri( uri, dataset.table("movies"), job_config=job_config, )
検索
# クエリの実行 query_job = bq_client.query(""" SELECT * FROM example.movies LIMIT 50""") # これがなくても下のresult()で処理が終わるまでwaitするけど、動いてる風の表示として import time while not query_job.done: time.sleep(1) print('.', end='') # 結果の表示 results = query_job.result() for row in results: print(row.values()) #=> (777, "Pharaoh's Army (1995)", ['War']) #=> (966, 'Walk in the Sun, A (1945)', ['War']) #=> (1450, 'Prisoner of the Mountains (Kavkazsky plennik) (1996)', ['War'])
pandasのDataFrameに変換することもできる。
results = query_job.result() movies = results.to_dataframe() movies.head(2) #=> movieId title genres #=> 0 777 Pharaoh's Army (1995) [War] #=> 1 966 Walk in the Sun, A (1945) [War]
改定履歴
Author: Masato Watanabe, Date: 2019-12-08, 記事投稿