由于spring和es的集成并不是特别友好,es的高低版本兼容问题、api更新频率高等问题,所以我选择是官网提供的原生Client(RestHighLevelClient),但又不想去关注es的配置类以及和spring的集成配置、jar包冲突等问题,所以使用spring-boot-starter-data-elasticsearch。
一、引入依赖jar二、application.properties配置org.springframework.boot spring-boot-starter-data-elasticsearch
spring.elasticsearch.rest.uris=http://127.0.0.1:9200,http://127.0.0.1:9201,http://127.0.0.1:9202 spring.elasticsearch.rest.connection-timeout=5s spring.elasticsearch.rest.read-timeout=30s logging.level.org.springframework.data.convert.CustomConversions=error
spring-boot-starter-data-elasticsearch中自动装配es的配置类:ElasticsearchRestClientAutoConfiguration、ElasticsearchRestClientProperties。
ElasticsearchRestClientAutoConfiguration:@ConditionalOnClass({RestHighLevelClient.class})
@ConditionalOnMissingBean({RestClient.class})
@EnableConfigurationProperties({ElasticsearchRestClientProperties.class})
public class ElasticsearchRestClientAutoConfiguration {
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnMissingBean({RestHighLevelClient.class})
static class RestHighLevelClientConfiguration {
RestHighLevelClientConfiguration() {
}
@Bean
RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}
}
@Configuration(
proxyBeanMethods = false
)
@ConditionalOnMissingBean({RestClientBuilder.class})
static class RestClientBuilderConfiguration {
RestClientBuilderConfiguration() {
}
@Bean
RestClientBuilderCustomizer defaultRestClientBuilderCustomizer(ElasticsearchRestClientProperties properties) {
return new ElasticsearchRestClientAutoConfiguration.DefaultRestClientBuilderCustomizer(properties);
}
@Bean
RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties, ObjectProvider builderCustomizers) {
HttpHost[] hosts = (HttpHost[])properties.getUris().stream().map(this::createHttpHost).toArray((x$0) -> {
return new HttpHost[x$0];
});
RestClientBuilder builder = RestClient.builder(hosts);
builder.setHttpClientConfigCallback((httpClientBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> {
customizer.customize(httpClientBuilder);
});
return httpClientBuilder;
});
builder.setRequestConfigCallback((requestConfigBuilder) -> {
builderCustomizers.orderedStream().forEach((customizer) -> {
customizer.customize(requestConfigBuilder);
});
return requestConfigBuilder;
});
builderCustomizers.orderedStream().forEach((customizer) -> {
customizer.customize(builder);
});
return builder;
}
private HttpHost createHttpHost(String uri) {
try {
return this.createHttpHost(URI.create(uri));
} catch (IllegalArgumentException var3) {
return HttpHost.create(uri);
}
}
private HttpHost createHttpHost(URI uri) {
if (!StringUtils.hasLength(uri.getUserInfo())) {
return HttpHost.create(uri.toString());
} else {
try {
return HttpHost.create((new URI(uri.getScheme(), (String)null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment())).toString());
} catch (URISyntaxException var3) {
throw new IllegalStateException(var3);
}
}
}
}
}
ElasticsearchRestClientProperties:
@ConfigurationProperties(
prefix = "spring.elasticsearch.rest"
)
public class ElasticsearchRestClientProperties {
private List uris = new ArrayList(Collections.singletonList("http://localhost:9200"));
private String username;
private String password;
private Duration connectionTimeout = Duration.ofSeconds(1L);
private Duration readTimeout = Duration.ofSeconds(30L);
public ElasticsearchRestClientProperties() {
}
public List getUris() {
return this.uris;
}
public void setUris(List uris) {
this.uris = uris;
}
public String getUsername() {
return this.username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return this.password;
}
public void setPassword(String password) {
this.password = password;
}
public Duration getConnectionTimeout() {
return this.connectionTimeout;
}
public void setConnectionTimeout(Duration connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public Duration getReadTimeout() {
return this.readTimeout;
}
public void setReadTimeout(Duration readTimeout) {
this.readTimeout = readTimeout;
}
}
三、使用
ES基本操作持久层
@Repository
@Slf4j
public class EsRepository {
@Resource
private RestHighLevelClient highLevelClient;
public boolean existIndex(String index) {
try {
return highLevelClient.indices().exists(new GetIndexRequest(index), RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("es持久层异常!index={}", index, e);
}
return Boolean.FALSE;
}
public boolean createIndex(String index, Map columnMap) {
if (existIndex(index)) {
return Boolean.FALSE;
}
CreateIndexRequest request = new CreateIndexRequest(index);
if (columnMap != null && columnMap.size() > 0) {
Map source = new HashMap<>();
source.put("properties", columnMap);
request.mapping(source);
}
try {
highLevelClient.indices().create(request, RequestOptions.DEFAULT);
return Boolean.TRUE;
} catch (IOException e) {
log.error("es持久层异常!index={}, columnMap={}", index, columnMap, e);
}
return Boolean.FALSE;
}
public boolean deleteIndex(String index) {
try {
if (existIndex(index)) {
AcknowledgedResponse response = highLevelClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
return response.isAcknowledged();
}
} catch (Exception e) {
log.error("es持久层异常!index={}", index, e);
}
return Boolean.FALSE;
}
public boolean insert(String index, String jsonString) {
IndexRequest indexRequest = new IndexRequest(index);
indexRequest.id(new Snowflake().nextIdStr());
indexRequest.source(jsonString, XContentType.JSON);
try {
log.info("indexRequest={}", indexRequest);
IndexResponse indexResponse = highLevelClient.index(indexRequest, RequestOptions.DEFAULT);
log.info("indexResponse={}", indexResponse);
return Boolean.TRUE;
} catch (IOException e) {
log.error("es持久层异常!index={}, jsonString={}", index, jsonString, e);
}
return Boolean.FALSE;
}
public boolean update(String index, Map dataMap) {
UpdateRequest updateRequest = new UpdateRequest(index, dataMap.remove("id").toString());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(dataMap);
try {
highLevelClient.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("es持久层异常!index={}, dataMap={}", index, dataMap, e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
public boolean delete(String index, String id) {
DeleteRequest deleteRequest = new DeleteRequest(index, id);
try {
highLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("es持久层异常!index={}, id={}", index, id, e);
return Boolean.FALSE;
}
return Boolean.TRUE;
}
}
ES查询持久层
@Repository
@Slf4j
public class EsSearchRepository {
@Resource
private RestHighLevelClient highLevelClient;
public EsQueryRespPO
其中,EsQueryReqPO、EsQueryRespPO、AggregationBucketPO等类如下:
@Data
public class EsQueryReqPO {
String[] index;
QueryBuilder query;
String sortField;
SortOrder sort;
private Integer pageNum;
private Integer pageSize;
private TermsAggregationBuilder termsAggregation;
private CardinalityAggregationBuilder cardinalityAggregation;
}
@Data @NoArgsConstructor @AllArgsConstructor public class EsQueryRespPO{ private Boolean success; private String message; private Integer pageNum; private Integer pageSize; private Long totalSize; private List sourceList; public static EsQueryRespPO success(List sourceList, Integer pageNum, Integer pageSize, Long totalSize) { EsQueryRespPO esQueryRespPO = new EsQueryRespPO<>(); esQueryRespPO.setSuccess(true); esQueryRespPO.setSourceList(sourceList); esQueryRespPO.setPageNum(pageNum); esQueryRespPO.setPageSize(pageSize); esQueryRespPO.setTotalSize(totalSize); return esQueryRespPO; } public static EsQueryRespPO error() { EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); esQueryRespPO.setSuccess(false); esQueryRespPO.setMessage("es查询异常"); return esQueryRespPO; } public static EsQueryRespPO error(String message) { EsQueryRespPO esQueryRespPO = new EsQueryRespPO(); esQueryRespPO.setSuccess(false); esQueryRespPO.setMessage(message); return esQueryRespPO; } }
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AggregationBucketPO {
private String key;
private Long docCount;
private Long docTotal;
}
其它
如果没有用spring-boot-starter-data-elasticsearch来自动注入es组件,那么需要自己做es client的注入,es配置类如下:
@Configuration
public class EsClientConfig {
@Value("${spring.elasticsearch.rest.uris}")
private List uris;
@Bean
public RestHighLevelClient restHighLevelClient() {
List httpHostList = uris.stream().map(HttpHost::create).collect(Collectors.toList());
HttpHost[] httpHost = new HttpHost[uris.size()];
httpHostList.toArray(httpHost);
RestClientBuilder clientBuilder = RestClient.builder(httpHost);
return new RestHighLevelClient(clientBuilder);
}
}
Snowflake是hutool包里的,导包:
cn.hutool
hutool-all
5.7.14
聚合查询的测试用例:
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class EsTest {
@Resource
private EsSearchRepository esSearchRepository;
@Test
public void testSearchAggregation() {
// 查询对象的封装
EsQueryReqPO queryPO = new EsQueryReqPO();
queryPO.setIndex(new String[]{"yzh1", "yzh2"});
queryPO.setPageNum(1);
queryPO.setPageSize(10);
// 时间戳范围
QueryBuilder queryBuilder1 = QueryBuilders.rangeQuery("timestamp")
.from(System.currentTimeMillis() - 1000)
.to(System.currentTimeMillis());
// 登录标识
QueryBuilder queryBuilder2 = QueryBuilders.termQuery("name", "yang");
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(queryBuilder1).must(queryBuilder2);
queryPO.setQuery(queryBuilder);
// 根据userName分组。创建terms桶聚合,聚合名字=terms_by_userName, 字段=payload.userName.keyword
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders
.terms("terms_by_userName").field("payload.userName.keyword");
termsAggregationBuilder.size(queryPO.getPageSize() * queryPO.getPageNum());
termsAggregationBuilder.subAggregation(new BucketSortPipelineAggregationBuilder("bucket_field", null)
.from((queryPO.getPageNum() - 1) * queryPO.getPageSize()).size(queryPO.getPageSize()));
queryPO.setTermsAggregation(termsAggregationBuilder);
// 根据userName聚合count统计. cardinality名=count_userName, 字段=payload.userName.keyword
CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders
.cardinality("count_userName").field("payload.userName.keyword");
queryPO.setCardinalityAggregation(cardinalityAggregationBuilder);
// 执行查询
EsQueryRespPO esQueryRespPO = esSearchRepository.searchAggregation(queryPO);
}
}



