Elasticsearch的部分操作(单条插入批量插入及根据条件删除记录),转java代码的示例如下:
public void writeDataToES(List> dataList, String indexName) { // 将数据添加到 bulkProcessor 中 int i =0; int lastOneIndex= dataList.size()-1; LOG.info("--the datalist will be added to the ws, and it's size="+dataList.size()); for (Map rowMap : dataList) { IndexRequest sourceRequest = new IndexRequest(indexName); sourceRequest.source(rowMap); bulkProcessor.add(sourceRequest); if(i==lastOneIndex){ LOG.info("###success to add the last one source map in batch, it's mapInfo="+rowMap); } i++; } bulkProcessor.flush(); } public void writeToES(Map data, String indexName) throws IOException { // 将数据添加到 bulkProcessor 中 LOG.info("--the datalist will be added to the ws, it's mapInfo="+data); IndexRequest sourceRequest = new IndexRequest(indexName); sourceRequest.source(data); client.index(sourceRequest, RequestOptions.DEFAULT); } //删除指定查询条件的记录. public long deleteByTermQuery(String indexName, String termField, String termValue) { //参数为索引名,可以不指定,可以一个,可以多个 DeleteByQueryRequest request = new DeleteByQueryRequest(indexName); // 更新时版本冲突 request.setConflicts("proceed"); // 设置查询条件,第一个参数是字段名,第二个参数是字段的值 request.setQuery(new TermQueryBuilder(termField, termValue)); // 更新最大文档数 request.setSize(2999); // 批次大小 request.setBatchSize(50); // 并行 request.setSlices(2); // 使用滚动参数来控制“搜索上下文”存活的时间 request.setScroll(Timevalue.timevalueMinutes(10)); // 超时 request.setTimeout(Timevalue.timevalueMinutes(2)); // 刷新索引 request.setRefresh(true); try { BulkByScrollResponse response = client.deleteByQuery(request, RequestOptions.DEFAULT); return response.getStatus().getUpdated(); } catch (IOException e) { LOG.error("!!!deleteByTermQuery error", e); } return -1; }
上一篇 设计模式 - 调停者&&门面&&责任链&&策略
下一篇 Kafka客户端实践与原理
版权所有 (c)2021-2022 MSHXW.COM
ICP备案号:晋ICP备2021003244-6号