iMind Developers Blog

iMind開発者ブログ

PythonでElasticsearchの操作

概要

PythonからElasticsearchにデータの登録、検索、削除を行う。

バージョン情報

  • Python 3.6.8 (elasticsearch==6.3.1)
  • Elasticsarch 6.5.4

導入

PythonのElasticsearchクライアントを入れておく。

$ pip install elasticsearch

ElasticsarchはDockerで立ち上げることにしておく。

下記のようにdocker-compose.ymlを記述。

version: "3"
services:

    elasticsearch1:
        image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.5.4
        container_name: elasticsearch1
        hostname: elasticsearch1
        environment:
            discovery.type: single-node
            ES_JAVA_OPTS: "-Xms4g -Xmx8g"
        ulimits:
            memlock:
                soft: -1
                hard: -1
        volumes:
            - './es_data:/usr/share/elasticsearch/data'
        ports:
            - '9200:9200'
            - '9300:9300'

起動

$ docker-compose up

接続確認

$ curl http://localhost:9200/ 

サンプルデータ

例としてProject Gutenbergからテキスト形式の小説を3冊ほど持ってくる。

https://www.gutenberg.org/browse/scores/top

本例では黄色い壁紙、ドリアングレイの肖像、ピュグマリオンの3冊を持ってきた。そういう気分の日もある。

ダウンロードしたテキストはnovelsというディレクトリに入れておく。

Project Gutenbergの書籍には本文の他に下記などの情報が付加されている。

  • Title
  • Author
  • Release Date
  • Last Updated
  • Language

下記のようなコードでdictionaryに変換する。

def file2dic(file_path):
    book = {'content': ''}
    with open(file_path, 'rt') as f:
        state = 'header'
        for line in f:
            if state == 'header':
                if line.startswith('Title: '):
                    book['title'] = line.split(':')[1].strip()
                if line.startswith('Author: '):
                    book['author'] = line.split(':')[1].strip()
                elif line.startswith('Release Date: '):
                    book['release_date'] = line.split(':')[1].strip()
                elif line.startswith('Last Updated: '):
                    book['last_updated'] = line.split(':')[1].strip()
                elif line.startswith('Language: '):
                    book['language'] = line.split(':')[1].strip()
                elif line.startswith('*** START '):
                    state = 'body'
            elif state == 'body':
                if line.startswith('*** END '):
                    break
                book['content'] += line
    return book

indexの生成

まずはスキーマ定義から。release_dateとかはtimestampにもできるけどパース書くの面倒なのでtextで入れておく。細かい指定は割愛。

import elasticsearch

mapping = {
    "mappings" : {
        "_doc" : {
            "properties" : {
                "author": {"type":"text"},
                "release_date": {"type":"text"},
                "last_updated": {"type":"text"},
                "language": {"type":"text"},
                "content": {"type":"text"}
            }
        }
    }
}
client = elasticsearch.Elasticsearch("localhost:9200")
client.indices.create(index='novels', body=mapping)

実行すると下記URLにnovelsというindexが出来ている。

http://localhost:9200/novels

indexの存在チェック

client.indices.exists(index="novels")
  #=> True

client.indices.exists(index="hoge")
  #=> False

indexの削除

client.indices.delete('novels')

データの登録

落としてきた小説の中から黄色い壁紙を登録する。

一意に設定される _idフィールドについてはについてはファイル名の拡張子以外の部分を登録しておく。おそらくこれでproject butenberg内ではユニークになるはず。

_idに一意になる値を入れておくと再度データを登録する際に重複が発生しなくなる。

import os
path = 'novels/1952-0.txt'
book = file2dic(path)
_id = os.path.splitext(os.path.basename(path))[0]

client = elasticsearch.Elasticsearch("localhost:9200")
client.index(index='novels', doc_type='_doc', id=_id, body=book)

これでデータが登録された。

下記でデータが参照できる。

http://localhost:9200/novels/_doc/1952-0

データの検索

3ドキュメントを登録した状態で検索をかけてみる。

result = client.search(
        index='novels',
        body={'query': {'match': {'title': 'yellow'}}})
hits = result['hits']
first_doc = hits['hits'][0]

print('ヒット数 : %s' % hits['total'])
print('ID : %s' % first_doc['_id'])
print('タイトル : %s' % first_doc['_source']['title'])

実行結果

ヒット数 : 1
ID : 1952-0
タイトル : The Yellow Wallpaper

削除

登録したデータの削除を行う。

ID指定の場合。

client.delete(index='novels', doc_type='_doc', id='1952-0')

クエリで指定する場合。

client.delete_by_query(
        index='novels', 
        body={'query': {'match': {'title': 'picture'}}})

全件削除

client.delete_by_query(
        index='novels',
        body={"query": {"match_all": {}}})

bulk insert

一度に大量のドキュメントを登録する場合は個別に処理すると時間がかかるのでbulk insertでまとめて登録する。

from elasticsearch import helpers

def create_documents():
    for fname in os.listdir('novel'):
        path = os.path.join(['novels', fname])
        book = file2dic(path)
        yield {
            "_index": 'novels',
            "_id": os.path.splitext(fname)[0],
            "_source": book
        }

elasticsearch.helpers.bulk(
        client,
        actions=create_documents,
        chunk_size=1000)

parallel bulk

bulkでも遅い場合は並列実行版のparallel_bulkも検討する。

list(elasticsearch.helpers.parallel_bulk(
        client,
        actions=data,
        thread_count=multiprocessing.cpu_count(),
        chunk_size=1000))

改定履歴

Author: Masato Watanabe, Date: 2019-03-08, 記事投稿