elasticsearch-rubyでPoint in time APIを使って1万件以上取得する
バージョン
- Elasticsearch: 7.10.1
1万件の制限
Elasticsearchのデフォルトでは通常の search
で取得可能なドキュメントは1万件でそれ以上はPIT (Point In Time)を使って取得する必要があります。
Scroll APIというのもあるようですがこちらは現在非推奨になっているのでPITとsearch_afterパラメータを使って取得するのが良いようです。
Point in time API - Elasticsearch Reference [7.11] - Elastic
ちなみに1万件の制限については index.max_result_window
で変更できます。
Index modules - Elasticsearch Reference [7.11] - Elastic
サンプル
Gemfile
gem 'elasticsearch', '~> 7'
gem 'elasticsearch-xpack', '~> 7'
main.rb
#!/usr/bin/env ruby
Bundler.require
client = Elasticsearch::Client.new(
url: 'http://127.0.0.1:9200',
log: true,
)
pit_id = nil
search_after = nil
begin
# スナップショットを作成(1分間だけ保持する)
response = client.xpack.open_point_in_time(index: 'items', keep_alive: '1m')
puts JSON.pretty_generate(response)
pit_id = response['id']
# 一度に取得する件数
size = 10000
while true
search_opts = {
body: {
size: size,
query: {
match: {
'user_id': 123,
}
},
pit: {
id: pit_id,
keep_alive: '1m', # スナップショットを1分間延長する
},
sort: [
{ 'title': 'desc' },
{ 'id': 'desc' },
],
},
}
search_opts[:body][:search_after] = search_after if search_after
response = client.search(search_opts)
# 各リクエスト毎にIDが変化する可能性があるので最新のIDを設定する
pit_id = response['pit_id']
# 次に読み込む位置を設定
search_after = response['hits']['hits'].last['sort']
# 取得できた件数がsizeより少ないなら最後まで読み込んだ
fetched_num = response['hits']['hits'].size
if fetched_num < size
puts "全て読み込みました。"
break
end
end
ensure
# スナップショットを削除
if pit_id
response = client.xpack.close_point_in_time(body: JSON.generate({id: pit_id}))
puts JSON.pretty_generate(response)
end
end
まとめ
elasticsearch-ruby
で X-Pack
の使い方が分かりにくかったですがライブラリの実装を読むことで理解できました。
elastic/elasticsearch-ruby: Ruby integrations for Elasticsearch
更新履歴
- 5d67274 2021-12-18T09:57:47+09:00 記事のディレクトリ構造を変更