iMind Developers Blog

iMind開発者ブログ

PythonでBigQueryの操作

概要

Pythonを利用してBigQueryのデータの追加、削除、検索を行う。

データはpandasで用意して投入する想定。

バージョン情報

  • Python 3.7.4
  • google-cloud-bigquery==1.21.0

導入

$ pip install google-cloud-bigquery

BigQueryとCloud Storageが許諾されているサービスアカウントを用いて実行する想定。gcloudは既に入っているものとする。

参考サイト

https://googleapis.dev/python/bigquery/latest/index.html

サンプルデータ

MovieLensに含まれるmovies.csvをサンプルデータとして利用する。

https://grouplens.org/datasets/movielens/

import pandas as pd

# movies.csvの読み込み
df = pd.read_csv('movies.csv')

# genresを配列に変換
df['genres'] = df.genres.apply(lambda g: g.split('|'))

df.head(3)
    #=>       movieId                    title                                             genres
    #=> 0        1         Toy Story (1995)  [Adventure, Animation, Children, Comedy, Fantasy]
    #=> 1        2           Jumanji (1995)                     [Adventure, Children, Fantasy]
    #=> 2        3  Grumpier Old Men (1995)                                  [Comedy, Romance]

サービスアカウントでのBigQueryへの接続

jsonファイルを指定してclientを生成する。

from google.cloud import bigquery

KEY_PATH = 'キーファイルのパス'

bq_client = bigquery.Client.from_service_account_json(KEY_PATH)

authを自前で書く場合は下記。

from google.cloud import bigquery
from google.oauth2 import service_account

KEY_PATH = 'キーファイルのパス'

# auth認証
credentials = service_account.Credentials.from_service_account_file(KEY_PATH)

# clientの生成
bq_client = bigquery.Client(credentials=credentials)

データセットの生成

exampleという名前でデータセットを生成してみる。

bq_client.create_dataset('example')

生成されたか確認。

for ds in bq_client.list_datasets():
    print(ds.dataset_id)

    #=> example

テーブルの生成

サンプルデータの形式に合わせてテーブルのスキーマを指定してcreate_tableする。

ArrayのカラムはmodeをREPEATEDにする。

# スキーマ
schema = [
    bigquery.SchemaField("movieId", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("title", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("genres", "STRING", mode="REPEATED"),
]

# table_idを生成(形式 : project_name.dataset_id.table_name)
dataset = bq_client.get_dataset('example')
table_id = '%s.%s.movies' % (dataset.project, dataset.dataset_id)

# テーブル生成
table = bigquery.Table(table_id, schema=schema)
bq_client.create_table(table)

pandasからのデータの投入

pandasのDataFrameからそのままデータを投入する。

# Tableを取得
table = bigquery.Table(table_id, schema=schema)

# DataFrameを投入
bq_client.insert_rows_from_dataframe(table, df)

テーブルの削除

table = dataset.table('movies')

bq_client.delete_table(table)

jsonデータの投入

jsonでデータを投入してみる。

まずはDataFrameをjsonに変換。行ごとにjson化されたファイルを作ります。

with open('movies.json', 'wt') as fp:
    for idx, row in df.iterrows():
        fp.write('{}\n'.format(row.to_json()))
$ head movies.json

{"movieId":1,"title":"Toy Story (1995)","genres":["Adventure","Animation","Children","Comedy","Fantasy"]}
{"movieId":2,"title":"Jumanji (1995)","genres":["Adventure","Children","Fantasy"]}
{"movieId":3,"title":"Grumpier Old Men (1995)","genres":["Comedy","Romance"]}

これをcloud storageにアップロード。

from google.cloud import storage

KEY_FILE = 'cloud storageを触れるキーファイル'
BUCKET_NAME = 'ファイルを置くバケット'

# clientの生成
storage_client = storage.Client.from_service_account_json(KEY_FILE)

# bucketの取得
bucket = storage_client.get_bucket(BUCKET_NAME)

# ファイルのアップロード
blob = bucket.blob('example/movies.json') 
blob.upload_from_filename('movies.json')

これでjsonファイルが example/movies.json というパスでアップロードされた。

置いたデータをBigQueryに投入する。

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
uri = "gs://%s/example/movies.json" % BUCKET_NAME

load_job = bq_client.load_table_from_uri(
    uri,
    dataset.table("movies"),
    job_config=job_config,
)

検索

# クエリの実行
query_job = bq_client.query("""
    SELECT * FROM example.movies LIMIT 50""")

# これがなくても下のresult()で処理が終わるまでwaitするけど、動いてる風の表示として
import time
while not query_job.done:
    time.sleep(1)
    print('.', end='')

# 結果の表示
results = query_job.result()
for row in results:
    print(row.values())

    #=> (777, "Pharaoh's Army (1995)", ['War'])
    #=> (966, 'Walk in the Sun, A (1945)', ['War'])
    #=> (1450, 'Prisoner of the Mountains (Kavkazsky plennik) (1996)', ['War'])

pandasのDataFrameに変換することもできる。

results = query_job.result()

movies = results.to_dataframe()
movies.head(2)

    #=>    movieId                      title genres
    #=>    0      777      Pharaoh's Army (1995)  [War]
    #=>    1      966  Walk in the Sun, A (1945)  [War]

改定履歴

Author: Masato Watanabe, Date: 2019-12-08, 記事投稿