Spring Boot 2.6.3 、Elasticsearch6.6.2、 JDK1.8
依赖POMapplication.yml配置org.projectlombok lombok1.18.4 org.springframework.boot spring-boot-starter-data-elasticsearchcom.google.guava guava21.0 com.alibaba fastjson1.2.59
app:
es:
nodes: 192.168.116.200:9200,192.168.116.201:9200
ESConfig 配置类
package com.example.link.config;
import com.google.common.base.Splitter;
import com.google.common.primitives.Ints;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.nio.reactor.IOReactorException;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class EsConfig {
@Autowired
private ESProperties esProperties;
@Bean
public RestHighLevelClient restHighLevelClient() {
// 1. 解析host和port
List httpHosts = new ArrayList<>();
Splitter.on(",").withKeyValueSeparator(":").split(esProperties.getNodes()).forEach((host, port) -> {
httpHosts.add(new HttpHost(host, Ints.tryParse(port), "http"));
});
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts.toArray(new HttpHost[]{}));
// 超时时间设为5分钟
restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(30 * 1000)
.setSocketTimeout(60 * 60 * 1000);
}
});
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
PoolingNHttpClientConnectionManager connManager = null;
try {
connManager = new PoolingNHttpClientConnectionManager(new DefaultConnectingIOReactor(IOReactorConfig.DEFAULT));
connManager.setMaxTotal(esProperties.getConnectionMax());
connManager.setDefaultMaxPerRoute(1000);
} catch (IOReactorException e) {
e.printStackTrace();
}
httpClientBuilder.setConnectionManager(connManager);
httpClientBuilder.setKeepAliveStrategy(new DefaultConnectionKeepAliveStrategy());
return httpClientBuilder;
});
RestHighLevelClient client = new RestHighLevelClient(restClientBuilder);
return client;
}
}
ESProperties配置类
package com.example.link.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@ConfigurationProperties(prefix = "app.es")
@Configuration
public class ESProperties {
private String nodes;
private Integer connectionMax = 200 ;
}
linkWebABpplicationTests 测试类
package com.example.link;
import com.alibaba.fastjson.JSON;
import com.example.link.domain.User;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
class linkWebBApplicationTests {
@Autowired
private RestHighLevelClient restHighLevelClient;
@Test
public void test() throws IOException {
for (int i = 0; i <= 100; i++) {
User user = new User("盖伦", 28);
IndexRequest indexRequest = new IndexRequest("hig_index",
"hig_type").source(JSON.toJSonString(user), XContentType.JSON);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("indexResponse:{}", JSON.toJSonString(indexResponse));
}
}
@Test
public void test1() throws IOException {
BulkRequest request = new BulkRequest();
for (int i = 0; i <= 100; i++) {
User user = new User("盖伦"+i, i);
//这里必须每次都使用new IndexRequest(index,type),不然只会插入最后一条记录(这样插入不会覆盖已经存在的Id,也就是不能更新)
request.add(new IndexRequest("hig_index",
"hig_type").source(JSON.toJSonString(user), XContentType.JSON));
}
BulkResponse bulkItemResponses = restHighLevelClient.bulk(request,RequestOptions.DEFAULT);
log.info("bulkItemResponses:{}", JSON.toJSonString(bulkItemResponses));
}
@Test
public void test2() throws IOException {
User user = new User("盖伦1", 22);
UpdateRequest updateRequest = new UpdateRequest(
"hig_index",
"hig_type", "1").doc(JSON.toJSonString(user), XContentType.JSON);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
log.info("updateResponse:{}", JSON.toJSonString(updateResponse));
}
@Test
public void test3() throws IOException {
DeleteRequest deleteRequest = new DeleteRequest("hig_index",
"hig_type", "1");
// 删除之后强制刷新
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
log.info("DeleteResponse:{}", JSON.toJSonString(deleteResponse));
}
@Test
public void deleteByQueryRequest() throws IOException {
DeleteByQueryRequest request = new DeleteByQueryRequest("hig_index");
request.setDocTypes("hig_type");
request.setQuery(new TermQueryBuilder("age", "28"));
BulkByScrollResponse bulkByScrollResponse = restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
log.info("bulkByScrollResponse:{}", JSON.toJSonString(bulkByScrollResponse));
}
@Test
public void test4() throws IOException {
// 构建查询条件
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("userName", "盖伦"))
.must(QueryBuilders.termQuery("age", "28"));
// 创建搜索请求
SearchRequest searchRequest = new SearchRequest("hig_index");
// 构建查询
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(queryBuilder)
.from(0)
.size(10);
// 设置查询条件
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 查询命中的记录
SearchHits searchHits = searchResponse.getHits();
// 获取原始记录,并填充
List



