条件查询、排序、去重
SearchSourceBuilder builder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (ObjectUtil.isNotEmpty(apiDispatchRecordDto.getApiName())) {
boolQueryBuilder.must(QueryBuilders.wildcardQuery("apiName.keyword", "*" + apiDispatchRecordDto.getApiName()+ "*"));
}
if (ObjectUtil.isNotEmpty(apiDispatchRecordDto.getAppApiKey())) {
boolQueryBuilder.must(QueryBuilders.wildcardQuery("appApiKey", apiDispatchRecordDto.getAppApiKey()));
}
if (ObjectUtil.isNotEmpty(apiDispatchRecordDto.getAppName())) {
boolQueryBuilder.must(QueryBuilders.wildcardQuery("appName.keyword", "*"+apiDispatchRecordDto.getAppApiKey()+"*"));
}
if (ObjectUtil.isNotEmpty(apiDispatchRecordDto.getStart())&&ObjectUtil.isNotEmpty(apiDispatchRecordDto.getEnd())) {
boolQueryBuilder.must(QueryBuilders.rangeQuery("dispatchTime").gte(apiDispatchRecordDto.getStart()).lte(apiDispatchRecordDto.getEnd()));
}
builder.query(boolQueryBuilder);
//去重
builder.collapse(new CollapseBuilder("apiName.keyword"));
//排序
builder.sort("dispatchTime", SortOrder.DESC);
Map map = elasticsearchUtil.searchListData(KafkaConstant.OPEN_API_DISPATCH_RECORD_TOPIC, builder, pageSize, pageNum-1,
null, null, null);
List list = (List) map.get("data");
查询符合条件的数据量
SearchSourceBuilder builder = new SearchSourceBuilder();
CountRequest countRequest = new CountRequest();
//构造查询条件
builder.query(QueryBuilders.termQuery("apiResponseCode",code));
countRequest.indices(KafkaConstant.OPEN_API_DISPATCH_RECORD_TOPIC).query(QueryBuilders.termQuery("apiResponseCode",code));
return (int)restHighLevelClient.count(countRequest, RequestOptions.DEFAULT).getCount();
添加数据
ApiDispatchRecordEsBo apiDispatchRecordEsBo = new ApiDispatchRecordEsBo();
apiDispatchRecordEsBo.setApiId("1506101690120949761");
apiDispatchRecordEsBo.setApiName("物联网设备注册");
apiDispatchRecordEsBo.setApiResponseCode(404);
apiDispatchRecordEsBo.setAppApiKey("d0d8544326304087848934583a271fe3");
apiDispatchRecordEsBo.setApiResponseTime(444l);
apiDispatchRecordEsBo.setDispatchTime(new Date(1651373682));
JSONObject jsonObject = JSONObject.parseObject(JSON.toJSONString(apiDispatchRecordEsBo));
System.out.println(elasticsearchUtil.addData(jsonObject, "open_api_dispatch_record"));
工具类
package com.netintech.es.util;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import com.netintech.common.core.utils.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.HttpAsyncResponseConsumerFactory;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Component
public class ElasticsearchUtil {
@Autowired
private RestHighLevelClient restHighLevelClient;
public boolean createIndex(String index) throws IOException {
if (isIndexExist(index)) {
return false;
}
//1.创建索引请求
CreateIndexRequest request = new CreateIndexRequest(index);
//2.执行客户端请求
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
return response.isAcknowledged();
}
public boolean deleteIndex(String index) throws IOException {
if (!isIndexExist(index)) {
return false;
}
//删除索引请求
DeleteIndexRequest request = new DeleteIndexRequest(index);
//执行客户端请求
AcknowledgedResponse delete = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
return delete.isAcknowledged();
}
public static void main(String[] args) {
}
public boolean isIndexExist(String index) throws IOException {
GetIndexRequest request = new GetIndexRequest(index);
boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
return exists;
}
public String bulkPutIndex(List