这个系列的文章会基于Java描述Elasticsearch中主要API的用法,其中会加入一些自己的理解,如有错误之处还请不宁赐教,非常感谢。
已经发布的文章在笔者有所感悟后还会不断更新,以期记录自己学习Elasticsearch的过程。
Elasticsearch批量操作文档
- Elasticsearch系列文档
- 前言
- 1 创建请求
- 2 执行请求
- 3 请求响应
- 4 批量处理器
前言
JavaAPI下的Elasticsearch用法。
1 创建请求可以添加不同类型的子请求到BulkRequest 中。
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")
.source(XContentType.JSON,"field", "baz"));
批量操作只支持json和键值对的形式。
2 执行请求BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);3 请求响应
可以遍历bulkResponse获取每一个子操作的返回结果。
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}
可以快速判断批量操作中是否存在失败的子操作。
if (bulkResponse.hasFailures()) {
}
当然可以遍历bulkResponse获取错误的详情。
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure =
bulkItemResponse.getFailure();
}
}
4 批量处理器
Elasticsearch说它提供了一个简单的批量处理器来简化批量处理API,他主要由listener 和bulkProcessor两部分构成。但我并不觉得它有多简单。BulkProcessor.Listener可以在处理之前、结束或失败时被调用到。
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
}
};
BulkProcessor bulkProcessor = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener, "bulk-processor-name").build();
在创建BulkProcessor 之前,可以使用BulkProcessor.Builder对批量操作的执行方式进行配置。
BulkActions和BulkSize可以设置批次的大小,默认分别为1000和5Mb。
ConcurrentRequests设置并行数目,默认为1,设为0表示一次只执行一个子操作。
FlushInterval设置批量数据刷入磁盘的间隔时间,默认不设置。
BackoffPolicy设置重试策略。如下的TimeValue.timeValueSeconds(1L), 3)表示等1s之后重试最多3次。
BulkProcessor.Builder builder = BulkProcessor.builder(
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener, "bulk-processor-name");
builder.setBulkActions(500);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
之后就可以将子请求添加到bulkProcessor中了。
IndexRequest one = new IndexRequest("posts").id("1")
.source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);
每个批量请求都会调用BulkProcessor.Listener。
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
最后还需要关闭bulkProcessor。使用awaitClose方法会等待一段时间后关闭,如果关闭时所有批量请求都完成了,则terminated 返回true,否则返回false。而使用close方法会立即关闭bulkProcessor。
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
bulkProcessor.close();
感觉比原本的批量API要复杂了不少。尚不清楚这个BulkProcessor 比起原本的异步批量API有什么优势。



