Fork me on GitHub

Elasticsearch中数据写入总结

目录

  • 背景
  • 第一部分 Search 接口
  • 第二部分 Scroll 接口
  • 第三部分 Scan 接口
  • 第四部分 总结
  • 参考文献及资料

背景

https://www.elastic.co/guide/en/elasticsearch/reference/7.16/docs-index_.html

第一部分 写入原理

第二部分 单条写入

Creates or updates a document in an index.

python index接口

第二部分 高级API

2.1 bulk接口

2.2 streaming_bulk 接口

2.3 parallel_bulk接口

2.4 压测

最近的爬虫项目里涉及往ES中大量写入数据,因此做了一些调研。
总而言之,py-elasticsearch库推荐使用helper.bulk相关函数进行批量写入(实际是批量执行,不仅限于写入),而bulk有三个相关函数:

  • parallel_bulk():并发批量执行;
  • streaming_bulk():流式批量执行;
  • bulk():在源码里可以看到,本质上是对streaming_bulk()的封装,返回了统计结果,方便处理。

* 先附上三个函数的代码范例

官方文档:https://elasticsearch-py.readthedocs.io/en/master/helpers.html#example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from elasticsearch.helpers import streaming_bulk, parallel_bulk, bulk, scan
def generate_actions():
for doc in doc_list:
yield doc

# 1. parallel_bulk(还可以用类似streaming_bulk的for循环)
deque(parallel_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), chunk_size=3000, thread_count=32), maxlen=0)

# 2. streaming_bulk
for ok, action in streaming_bulk(client=self.es, index=index, doc_type="doc", actions=generate_actions(), max_retries=5):
pass

# 3. bulk
bulk(client=self.es, doc_type="doc", index=index, actions=generate_actions())

一次性写入5w条数据的耗时测试(多次实验)

1
2
3
4
C:\ProgramData\Anaconda3\python.exe D:/pyelasticsearch/pyelasticsearch/tests/bulkTest.py
Function [parallel_bulk_test] cost time is 6.17
Function [streaming_bulk_test] cost time is 7.27
Function [bulk_test] cost time is 7.50
设备 parallel_bulk streaming_bulk bulk 逐条写入
i5-7200u的笔记本 4-5秒 15秒 15秒+ 2000秒+
8核32线程64G的服务器 2秒 4秒 4秒 50秒

结论:如果对写入耗时要求不高,用bulk()即可;如果需要判断每一条写入的返回状态,用streaming_bulk();追求速度的话,用parallel_bulk()。

参考文献及资料

1、

本文标题:Elasticsearch中数据写入总结

文章作者:rong xiang

发布时间:2022年06月28日 - 13:06

最后更新:2022年10月25日 - 23:10

原始链接:https://zjrongxiang.github.io/posts/b637c822/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

0%