如果在数据量小的情况下,比如我们消费Kafka的数据到ElasticSearch,逐条消费和逐条写入,这样是没问题的。但是随着数据量越来越大的话,为了提高插入效率,我们就需要批量插入了。
官网https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request
原理解析bulk api可以在单个请求中一次执行多个索引或者删除操作,使用这种方式可以极大的提升索引性能。
bulk的语法格式是:
action and meta_data n optional source n action and meta_data n optional source n action and meta_data n optional source n
从上面能够看到,两行数据 构成了一次操作.
action: 批量操作的类型,有增、删、改、查操作
create: 文档不存在时候,创建文档,存在时,会报错,但是不影响其它数据操作index: 文档不存在创建,存在覆盖update: 更新文档delete: 删除文档
metadata:元数据,执行索引,文档id信息request body:操作具体的数据,不能使用json格式化,删除操作不需要。我们需要设置的它的Content-Type为application/jsonn:每行必须是回车换行 实践
我们可以将我们的操作直接写入到一个文本文件中,然后使用curl命令把它发送到服务端:
一个requests文件内容如下:
vi requests.json
{"index":{"_index":"my_store","_type":"_doc"}}
{"name":"tom","city":"cs"}
{"index":{"_index":"my_store","_type":"_doc"}}
{"name":"nick","city":"sz"}
{"index":{"_index":"my_store","_type":"_doc"}}
{"name":"rose","city":"shanghai"}
{"index":{"_index":"my_store","_type":"_doc"}}
{"name":"danni","city":"bj"}
格式化如下:
{
"index": {
"_index": "my_store",
"_type": "_doc"
}
}
{
"name": "tom",
"city": "cs"
}
{
"index": {
"_index": "my_store",
"_type": "_doc"
}
}
{
"name": "nick",
"city": "sz"
}
{
"index": {
"_index": "my_store",
"_type": "_doc"
}
}
{
"name": "rose",
"city": "shanghai"
}
{
"index": {
"_index": "my_store",
"_type": "_doc"
}
}
{
"name": "danni",
"city": "bj"
}
或者:
curl -X POST "localhost:9200/_bulk?pretty" -H 'Content-Type: application/json' -d'
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
'
解释:
_index: 表示需要插入的索引的名_type: 表示类型_id: 如果不指定id默认会新增随机id
Console执行命令:
curl -s -u elastic:pass -H "Content-Type: application/json" -PUT 'zz.cn:9200/_bulk' --data-binary @requests.json; ## 或者 curl -u elastic:pass -H "Content-Type: application/json" -PUT 'zz.cn:9200/_bulk' --data-binary @cc;
执行返回:
bulk请求的返回操作的结果也是批量的,每一个action都会有具体的应答体,来告诉你当前action是成功执行还是失败 。
from elasticsearch import Elasticsearch
from elasticsearch import helpers
es = Elasticsearch("zz.cn",http_auth=("elastic","xx"))
body_data=[
{"index":{"_index":"my_store","_type":"_doc"}},
{"name":"tom","city":"cs"},
{"index":{"_index":"my_store","_type":"_doc"}},
{"name":"nick","city":"sz"},
{"index":{"_index":"my_store","_type":"_doc"}},
{"name":"rose","city":"shanghai"},
{"index":{"_index":"my_store_01","_type":"_doc"}},
{"name":"danni","city":"bj"}
]
resp = es.bulk(body=body_data)
print(resp)



