栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

elasticsearch bulk

elasticsearch bulk

背景

如果在数据量小的情况下,比如我们消费Kafka的数据到ElasticSearch,逐条消费和逐条写入,这样是没问题的。但是随着数据量越来越大的话,为了提高插入效率,我们就需要批量插入了。

官网

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#docs-bulk-api-request

原理解析

bulk api可以在单个请求中一次执行多个索引或者删除操作,使用这种方式可以极大的提升索引性能。

bulk的语法格式是:

action and meta_data n
optional source n
 
action and meta_data n
optional source n
 
action and meta_data n
optional source n

从上面能够看到,两行数据 构成了一次操作.

action: 批量操作的类型,有增、删、改、查操作

    create: 文档不存在时候,创建文档,存在时,会报错,但是不影响其它数据操作index: 文档不存在创建,存在覆盖update: 更新文档delete: 删除文档

metadata:元数据,执行索引,文档id信息request body:操作具体的数据,不能使用json格式化,删除操作不需要。我们需要设置的它的Content-Type为application/jsonn:每行必须是回车换行 实践

我们可以将我们的操作直接写入到一个文本文件中,然后使用curl命令把它发送到服务端:

一个requests文件内容如下:

vi requests.json

{"index":{"_index":"my_store","_type":"_doc"}}
{"name":"tom","city":"cs"}
{"index":{"_index":"my_store","_type":"_doc"}}
{"name":"nick","city":"sz"}
{"index":{"_index":"my_store","_type":"_doc"}}
{"name":"rose","city":"shanghai"}
{"index":{"_index":"my_store","_type":"_doc"}}
{"name":"danni","city":"bj"}

格式化如下:

{
  "index": {
    "_index": "my_store",
    "_type": "_doc"
  }
}
{
  "name": "tom",
  "city": "cs"
}
{
  "index": {
    "_index": "my_store",
    "_type": "_doc"
  }
}
{
  "name": "nick",
  "city": "sz"
}
{
  "index": {
    "_index": "my_store",
    "_type": "_doc"
  }
}
{
  "name": "rose",
  "city": "shanghai"
}
{
  "index": {
    "_index": "my_store",
    "_type": "_doc"
  }
}
{
  "name": "danni",
  "city": "bj"
}

或者:

curl -X POST "localhost:9200/_bulk?pretty" -H 'Content-Type: application/json' -d'
{ "index" : { "_index" : "test", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_id" : "2" } }
{ "create" : { "_index" : "test", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
'

解释: 

_index: 表示需要插入的索引的名_type: 表示类型_id: 如果不指定id默认会新增随机id

Console执行命令:

curl -s -u elastic:pass -H "Content-Type: application/json" -PUT  'zz.cn:9200/_bulk'   --data-binary @requests.json;

## 或者
curl  -u elastic:pass  -H "Content-Type: application/json" -PUT  'zz.cn:9200/_bulk'   --data-binary @cc;

执行返回:
   bulk请求的返回操作的结果也是批量的,每一个action都会有具体的应答体,来告诉你当前action是成功执行还是失败 。

Python代码示例
from elasticsearch import Elasticsearch
from elasticsearch import helpers


es = Elasticsearch("zz.cn",http_auth=("elastic","xx"))

body_data=[
    {"index":{"_index":"my_store","_type":"_doc"}},
    {"name":"tom","city":"cs"},
    {"index":{"_index":"my_store","_type":"_doc"}},
    {"name":"nick","city":"sz"},
    {"index":{"_index":"my_store","_type":"_doc"}},
    {"name":"rose","city":"shanghai"},
    {"index":{"_index":"my_store_01","_type":"_doc"}},
    {"name":"danni","city":"bj"}
]


resp = es.bulk(body=body_data)
print(resp)


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734913.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号