栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Elasticsearch批量操作文档

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Elasticsearch批量操作文档

Elasticsearch系列文档

这个系列的文章会基于Java描述Elasticsearch中主要API的用法,其中会加入一些自己的理解,如有错误之处还请不宁赐教,非常感谢。
已经发布的文章在笔者有所感悟后还会不断更新,以期记录自己学习Elasticsearch的过程。


Elasticsearch批量操作文档
  • Elasticsearch系列文档
  • 前言
  • 1 创建请求
  • 2 执行请求
  • 3 请求响应
  • 4 批量处理器


前言

JavaAPI下的Elasticsearch用法。

1 创建请求

可以添加不同类型的子请求到BulkRequest 中。

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));

批量操作只支持json和键值对的形式。

2 执行请求
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
3 请求响应

可以遍历bulkResponse获取每一个子操作的返回结果。

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:    
    case CREATE:
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:   
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

可以快速判断批量操作中是否存在失败的子操作。

if (bulkResponse.hasFailures()) { 

}

当然可以遍历bulkResponse获取错误的详情。

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure =
                bulkItemResponse.getFailure(); 
    }
}
4 批量处理器

Elasticsearch说它提供了一个简单的批量处理器来简化批量处理API,他主要由listener 和bulkProcessor两部分构成。但我并不觉得它有多简单。BulkProcessor.Listener可以在处理之前、结束或失败时被调用到。

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener, "bulk-processor-name").build(); 

在创建BulkProcessor 之前,可以使用BulkProcessor.Builder对批量操作的执行方式进行配置。
BulkActions和BulkSize可以设置批次的大小,默认分别为1000和5Mb。
ConcurrentRequests设置并行数目,默认为1,设为0表示一次只执行一个子操作。
FlushInterval设置批量数据刷入磁盘的间隔时间,默认不设置。
BackoffPolicy设置重试策略。如下的TimeValue.timeValueSeconds(1L), 3)表示等1s之后重试最多3次。

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener, "bulk-processor-name");
builder.setBulkActions(500); 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0); 
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 

之后就可以将子请求添加到bulkProcessor中了。

IndexRequest one = new IndexRequest("posts").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

每个批量请求都会调用BulkProcessor.Listener。

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        logger.error("Failed to execute bulk", failure); 
    }
};

最后还需要关闭bulkProcessor。使用awaitClose方法会等待一段时间后关闭,如果关闭时所有批量请求都完成了,则terminated 返回true,否则返回false。而使用close方法会立即关闭bulkProcessor。

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 
bulkProcessor.close();

感觉比原本的批量API要复杂了不少。尚不清楚这个BulkProcessor 比起原本的异步批量API有什么优势。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/874353.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号