ES复合查询的实现
ES查询语句:
get tableName/_search
{
"query": {
"bool": {
"must_not": {
"bool": {#判断条件1#}
},
"must": {
"bool": {#判断条件2#}
},
"should": {
"bool": {#判断条件2#}
}
}
}
}
不满足条件1,满足条件2,应该满足条件3(should:趋向,可以添加 minimum_should_match: 1 实现or的功能)
java语句:
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//must 查询条件
BoolQueryBuilder mustBool = QueryBuilders.boolQuery();
//must_not 查询条件
BoolQueryBuilder mustNotBool = QueryBuilders.boolQuery();
//should 查询条件
BoolQueryBuilder shouldBool = QueryBuilders.boolQuery();
boolQueryBuilder.must(mustBool).should(shouldBool).mustNot(mustNotBool);
sourceBuilder.query(boolQueryBuilder);
SearchRequest searchRequest = new SearchRequest("table_name");
searchRequest.source(sourceBuilder);
try {
SearchResponse search = client.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
ES写sql实现查询
查询语句:
post /_sql?format=txt
post /_sql?format=txt
{
"query": "select * from twitter limit 10"
}
post /_sql/translate
{
"query": "select * from twitter where author_id>40"
}
{
"size": 1000,
"query": {
"range": {
"author_id": {
"from": 40,
"to": null,
"include_lower": false,
"include_upper": false,
"boost": 1
}
}
},
"_source": false,
"fields": [
{
"field": "author_id"
},
{
"field": "author_name"
},
{
"field": "created_at",
"format": "strict_date_optional_time_nanos"
},
{
"field": "message"
}
],
"sort": [
{
"_doc": {
"order": "asc"
}
}
]
}
java 实现
增加依赖 es插件 bboss Elasticsearch SQL ORM操作 - Elasticsearch Bboss (bbossgroups.com)
批量操作的实现增加配置类
package com.ppl.esdemo.config;
@Configuration
public class EsConfig {
@Value("${elasticsearch.host}")
private String host;
@Value("${elasticsearch.port}")
private int port;
private RestHighLevelClient client;
@Bean
public RestHighLevelClient client(){
if(client==null){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, "http"))
);
this.client=client;
return client;
}
return client;
}
@Bean
public BulkProcessor bulkProcessor(){
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (!response.hasFailures()) {
// log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
} else {
BulkItemResponse[] items = response.getItems();
for (BulkItemResponse item : items) {
if (item.isFailed()) {
// log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
break;
}
}
}
}
@Override
public void afterBulk(long executionId, BulkRequest request,
Throwable failure) {
List> requests = request.requests();
List esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
// log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
client().bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
}), listener);
//到达10000条时刷新
builder.setBulkActions(10000);
//内存到达8M时刷新
builder.setBulkSize(new ByteSizevalue(8L, ByteSizeUnit.MB));
//设置的刷新间隔10s
builder.setFlushInterval(Timevalue.timevalueSeconds(10));
//设置允许执行的并发请求数。
builder.setConcurrentRequests(8);
//设置重试策略
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(1), 3));
return builder.build();
}
}
使用 bulkProcessor 存储增删改的请求累计发送
基本增删改的实现
package com.ppl.api.serrvice.impl;
@Service
public class TwitterServiceImpl implements TwitterService {
@Autowired
private RestHighLevelClient client;
@Autowired
private BulkProcessor bulkProcessor;
private final String ES_INDEX = "twitter";
@Override
public boolean insertList(List list) {
list.forEach(twitter -> {
insertOne(twitter);
});
return true;
}
@Override
public boolean updateList(List list) {
list.forEach(twitter -> {
updateOne(twitter);
});
return true;
}
@Override
public boolean deleteList(List list) {
list.forEach(twitter -> {
deleteOne(twitter);
});
return true;
}
@Override
public Page selectList(SearchSourceBuilder searchSourceBuilder) {
SearchRequest request = new SearchRequest(ES_INDEX);
request.source(searchSourceBuilder);
Page page = new Page<>();
ArrayList list = new ArrayList<>();
page.setRecords(list);
if(searchSourceBuilder==null){
return page;
}
try {
SearchResponse search = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = search.getHits();
long value = hits.getTotalHits().value;
page.setTotal(value);
SearchHit[] resHits = hits.getHits();
for (SearchHit resHit : resHits) {
Twitter twitter = JSON.parseObject(resHit.getSourceAsString(), Twitter.class);
list.add(twitter);
}
} catch (IOException e) {
e.printStackTrace();
}
return page;
}
@Override
public Twitter selectOne(SearchSourceBuilder searchSourceBuilder) {
SearchRequest request = new SearchRequest(ES_INDEX);
request.source(searchSourceBuilder);
try {
SearchResponse search = client.search(request, RequestOptions.DEFAULT);
SearchHits hits = search.getHits();
SearchHit[] resHits = hits.getHits();
for (SearchHit resHit : resHits) {
Twitter twitter = JSON.parseObject(resHit.getSourceAsString(), Twitter.class);
return twitter;
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
@Override
public boolean insertOne(Twitter twitter) {
IndexRequest request = new IndexRequest(ES_INDEX);
request.id(getId(twitter));
request.source(JSON.toJSONString(twitter), XContentType.JSON);
bulkProcessor.add(request);
return true;
}
@Override
public boolean updateOne(Twitter twitter) {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(ES_INDEX);
updateRequest.id(getId(twitter));
updateRequest.doc(JSON.toJSONString(twitter), XContentType.JSON);
bulkProcessor.add(updateRequest);
return true;
}
@Override
public boolean deleteOne(Twitter twitter) {
DeleteRequest deleteRequest = new DeleteRequest(ES_INDEX);
deleteRequest.id(getId(twitter));
bulkProcessor.add(deleteRequest);
return true;
}
private String getId(Twitter twitter){
String id="";
id=DigestUtils.md5Hex(twitter.getAuthorId()+twitter.getAuthorName());
return id;
}
}



