概要
PythonからElasticsearchにデータの登録、検索、削除を行う。
バージョン情報
- Python 3.6.8 (elasticsearch==6.3.1)
- Elasticsarch 6.5.4
導入
PythonのElasticsearchクライアントを入れておく。
$ pip install elasticsearch
ElasticsarchはDockerで立ち上げることにしておく。
下記のようにdocker-compose.ymlを記述。
version: "3" services: elasticsearch1: image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.5.4 container_name: elasticsearch1 hostname: elasticsearch1 environment: discovery.type: single-node ES_JAVA_OPTS: "-Xms4g -Xmx8g" ulimits: memlock: soft: -1 hard: -1 volumes: - './es_data:/usr/share/elasticsearch/data' ports: - '9200:9200' - '9300:9300'
起動
$ docker-compose up
接続確認
$ curl http://localhost:9200/
サンプルデータ
例としてProject Gutenbergからテキスト形式の小説を3冊ほど持ってくる。
https://www.gutenberg.org/browse/scores/top
本例では黄色い壁紙、ドリアングレイの肖像、ピュグマリオンの3冊を持ってきた。そういう気分の日もある。
ダウンロードしたテキストはnovelsというディレクトリに入れておく。
Project Gutenbergの書籍には本文の他に下記などの情報が付加されている。
- Title
- Author
- Release Date
- Last Updated
- Language
下記のようなコードでdictionaryに変換する。
def file2dic(file_path): book = {'content': ''} with open(file_path, 'rt') as f: state = 'header' for line in f: if state == 'header': if line.startswith('Title: '): book['title'] = line.split(':')[1].strip() if line.startswith('Author: '): book['author'] = line.split(':')[1].strip() elif line.startswith('Release Date: '): book['release_date'] = line.split(':')[1].strip() elif line.startswith('Last Updated: '): book['last_updated'] = line.split(':')[1].strip() elif line.startswith('Language: '): book['language'] = line.split(':')[1].strip() elif line.startswith('*** START '): state = 'body' elif state == 'body': if line.startswith('*** END '): break book['content'] += line return book
indexの生成
まずはスキーマ定義から。release_dateとかはtimestampにもできるけどパース書くの面倒なのでtextで入れておく。細かい指定は割愛。
import elasticsearch mapping = { "mappings" : { "_doc" : { "properties" : { "author": {"type":"text"}, "release_date": {"type":"text"}, "last_updated": {"type":"text"}, "language": {"type":"text"}, "content": {"type":"text"} } } } } client = elasticsearch.Elasticsearch("localhost:9200") client.indices.create(index='novels', body=mapping)
実行すると下記URLにnovelsというindexが出来ている。
indexの存在チェック
client.indices.exists(index="novels") #=> True client.indices.exists(index="hoge") #=> False
indexの削除
client.indices.delete('novels')
データの登録
落としてきた小説の中から黄色い壁紙を登録する。
一意に設定される _idフィールドについてはについてはファイル名の拡張子以外の部分を登録しておく。おそらくこれでproject butenberg内ではユニークになるはず。
_idに一意になる値を入れておくと再度データを登録する際に重複が発生しなくなる。
import os path = 'novels/1952-0.txt' book = file2dic(path) _id = os.path.splitext(os.path.basename(path))[0] client = elasticsearch.Elasticsearch("localhost:9200") client.index(index='novels', doc_type='_doc', id=_id, body=book)
これでデータが登録された。
下記でデータが参照できる。
http://localhost:9200/novels/_doc/1952-0
データの検索
3ドキュメントを登録した状態で検索をかけてみる。
result = client.search( index='novels', body={'query': {'match': {'title': 'yellow'}}}) hits = result['hits'] first_doc = hits['hits'][0] print('ヒット数 : %s' % hits['total']) print('ID : %s' % first_doc['_id']) print('タイトル : %s' % first_doc['_source']['title'])
実行結果
ヒット数 : 1 ID : 1952-0 タイトル : The Yellow Wallpaper
削除
登録したデータの削除を行う。
ID指定の場合。
client.delete(index='novels', doc_type='_doc', id='1952-0')
クエリで指定する場合。
client.delete_by_query( index='novels', body={'query': {'match': {'title': 'picture'}}})
全件削除
client.delete_by_query( index='novels', body={"query": {"match_all": {}}})
bulk insert
一度に大量のドキュメントを登録する場合は個別に処理すると時間がかかるのでbulk insertでまとめて登録する。
from elasticsearch import helpers def create_documents(): for fname in os.listdir('novel'): path = os.path.join(['novels', fname]) book = file2dic(path) yield { "_index": 'novels', "_id": os.path.splitext(fname)[0], "_source": book } elasticsearch.helpers.bulk( client, actions=create_documents, chunk_size=1000)
parallel bulk
bulkでも遅い場合は並列実行版のparallel_bulkも検討する。
list(elasticsearch.helpers.parallel_bulk( client, actions=data, thread_count=multiprocessing.cpu_count(), chunk_size=1000))
改定履歴
Author: Masato Watanabe, Date: 2019-03-08, 記事投稿