1、添加依赖、根据项目需要,排除了jackson的三个依赖,可不排除。
org.elasticsearch.client elasticsearch-rest-high-level-client 6.4.3 com.fasterxml.jackson.dataformat jackson-dataformat-smile com.fasterxml.jackson.dataformat jackson-dataformat-yaml com.fasterxml.jackson.dataformat jackson-dataformat-cbor
2、application.yml配置文件
elastic: es-host: 127.0.0.1(es安装服务器公网ip) es-port: 9200 user-name: elastic password: elastic123456 connect-count: 50 connect-per-route: 10
3、EsProperties配置读取类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = EsProperties.PREFIX)
public class EsProperties {
public static final String PREFIX = "elastic";
private String esHost;
private Integer esPort;
private String userName;
private String password;
private Integer connectCount;
private Integer connectPerRoute;
}
4、添加EsClient工厂类EsClientSpringFactory
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import java.io.IOException;
@Slf4j
public class EsClientSpringFactory {
private static final int CONNECT_TIMEOUT_MILLIS = 1000;
private static final int SOCKET_TIMEOUT_MILLIS = 30000;
private static final int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
private static int MAX_CONN_PER_ROUTE;
private static int MAX_CONN_TOTAL;
private static HttpHost HTTP_HOST;
private static String USER_NAME;
private static String PASSWORD;
private RestClientBuilder builder;
private RestClient restClient;
private RestHighLevelClient restHighLevelClient;
private static final EsClientSpringFactory ES_CLIENT_SPRING_FACTORY = new EsClientSpringFactory();
private EsClientSpringFactory() {
}
public static EsClientSpringFactory build(HttpHost esHost, String userName, String pwd, Integer maxConnectNum, Integer maxConnectPerRoute) {
HTTP_HOST = esHost;
USER_NAME = userName;
PASSWORD = pwd;
MAX_CONN_TOTAL = maxConnectNum;
MAX_CONN_PER_ROUTE = maxConnectPerRoute;
return ES_CLIENT_SPRING_FACTORY;
}
public void init() {
builder = RestClient.builder(HTTP_HOST);
setConnectTimeOutConfig();
setMuftiConnectConfig();
restClient = builder.build();
restHighLevelClient = new RestHighLevelClient(builder);
}
public void setConnectTimeOutConfig() {
builder.setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
return requestConfigBuilder;
});
}
public void setMuftiConnectConfig() {
//连接Elasticsearch集群
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(USER_NAME, PASSWORD));
builder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
httpClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpClientBuilder;
});
}
public RestClient getClient() {
return restClient;
}
public RestHighLevelClient getRhlClient() {
return restHighLevelClient;
}
public void close() {
if (restClient != null) {
try {
restClient.close();
} catch (IOException e) {
log.error(e.getMessage());
}
}
log.info("ES客户端实例关闭");
}
}
5、创建连接EsConnectionBean
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Configuration
@ComponentScan(basePackageClasses = EsClientSpringFactory.class)
public class EsConnectionBean {
@Resource
private EsProperties esConnectionProperties;
@Bean
public HttpHost httpHost() {
return new HttpHost(esConnectionProperties.getEsHost(), esConnectionProperties.getEsPort(), "http");
}
@Bean(initMethod = "init", destroyMethod = "close")
public EsClientSpringFactory getFactory() {
return EsClientSpringFactory.build(
httpHost(),
esConnectionProperties.getUserName(),
esConnectionProperties.getPassword(),
esConnectionProperties.getConnectCount(),
esConnectionProperties.getConnectPerRoute()
);
}
@Bean
public RestClient getRestClient() {
return getFactory().getClient();
}
@Bean
public RestHighLevelClient getRhlClient() {
return getFactory().getRhlClient();
}
}
6、实现类EsServiceImpl
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.stereotype.Service;
import tech.honghu.common.exceptions.BizException;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;
@Slf4j
@Service
public class EsServiceImpl implements EsService {
@Resource
private RestHighLevelClient restHighLevelClient;
@Override
public ResultWrapper createIndex(String index, String settings, String mappings) {
try {
CreateIndexResponse createIndexResponse = null;
if (!existsIndex(index)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
if (StringUtils.isNotBlank(settings)) {
buildIndexSetting(createIndexRequest, settings);
}
//buildIndexMapping(createIndexRequest, mappings);
createIndexRequest.settings();
createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
}
return new ResultWrapper().success(createIndexResponse == null ? "索引已存在" : JSON.toJSONString(createIndexResponse));
} catch (Exception e) {
log.error("索引创建失败", e);
return new ResultWrapper().fail(e.getMessage());
}
}
@Override
public void buildIndexSetting(CreateIndexRequest createIndexRequest, String setting) {
createIndexRequest.settings(setting, XContentType.JSON);
}
@Override
public void buildIndexMapping(CreateIndexRequest createIndexRequest, String mapping) {
createIndexRequest.mapping(mapping, XContentType.JSON);
}
@Override
public ResultWrapper deleteIndex(String indexName) {
try {
if (!existsIndex(indexName)) {
return new ResultWrapper().success("索引不存在");
}
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
return new ResultWrapper().success("索引删除成功");
} catch (Exception e) {
log.error("索引删除失败", e);
return new ResultWrapper().fail(e.getMessage());
}
}
@Override
public ResultWrapper createIndexDoc(IndexDocDTO indexDocDto) {
IndexRequest request = new IndexRequest();
request.index(indexDocDto.getIndex());
request.type(indexDocDto.getType());
request.id(StringUtils.isNotBlank(indexDocDto.getId()) ? indexDocDto.getId() : UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
request.source(indexDocDto.getSource(), XContentType.JSON);
try {
IndexResponse index = restHighLevelClient.index(request, RequestOptions.DEFAULT);
return new ResultWrapper().success(JSON.toJSONString(index));
} catch (IOException e) {
log.error("创建文档失败", e);
return new ResultWrapper().fail(e.getMessage());
}
}
@Override
public ResultWrapper updateIndexDoc(IndexDocDTO indexDocDto) {
try {
UpdateRequest updateRequest = new UpdateRequest(indexDocDto.getIndex(), indexDocDto.getType(), indexDocDto.getId());
updateRequest.doc(indexDocDto.getSource(), XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
return new ResultWrapper().success(JSON.toJSONString(updateResponse));
} catch (Exception e) {
log.error("更新文档失败", e);
return new ResultWrapper().fail(e.getMessage());
}
}
@Override
public ResultWrapper deleteIndexDoc(IndexDocDTO indexDocDto) {
try {
DeleteRequest deleteRequest = new DeleteRequest(indexDocDto.getIndex(), indexDocDto.getType(), indexDocDto.getId());
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
return new ResultWrapper().success(JSON.toJSONString(deleteResponse));
} catch (IOException e) {
log.error("删除文档失败", e);
return new ResultWrapper().fail(e.getMessage());
}
}
@Override
public Boolean existsIndex(String index) {
GetIndexRequest request = new GetIndexRequest();
request.indices(index);
Boolean flag = false;
try {
flag = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return flag;
}
@Override
public ResultWrapper bulkInsert(List IndexDocDTOList) {
try {
if (CollectionUtils.isEmpty(IndexDocDTOList)) {
return new ResultWrapper().fail("插入数据为空");
}
BulkRequest bulkAddRequest = new BulkRequest();
for (IndexDocDTO IndexDocDTO : IndexDocDTOList) {
IndexRequest indexRequest = new IndexRequest(IndexDocDTO.getIndex());
indexRequest.id(StringUtils.isNotBlank(IndexDocDTO.getId()) ? IndexDocDTO.getId() : UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
indexRequest.type(IndexDocDTO.getType());
indexRequest.source(IndexDocDTO.getSource(), XContentType.JSON);
bulkAddRequest.add(indexRequest);
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkAddRequest, RequestOptions.DEFAULT);
return new ResultWrapper().success(JSON.toJSONString(bulkResponse));
} catch (Exception e) {
log.error("批量添加文档失败", e);
return new ResultWrapper().fail(e.getMessage());
}
}
@Override
public ResultWrapper shoppingUserOrderSearch(SearchDocDTO searchDocDTO) {
String search = validateSearch(searchDocDTO);
if (StringUtils.isNotBlank(search)) {
return new ResultWrapper().fail(search);
}
try {
int pageNo = searchDocDTO.getPage();
int pageSize = searchDocDTO.getPageSize();
int startIndex = (pageNo - 1) * pageSize;
//获取索引名称
String index = searchDocDTO.getIndex();
IPage
7、IndexDocDTO类
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
@Data
public class IndexDocDTO {
private String index;
private String id;
private String type;
private JSONObject source;
}
8、SearchDocDTO类
import lombok.Data;
@Data
public class SearchDocDTO {
private String userId;
private String merchantId;
private String id;
private String type;
private String index;
private String fieldName;
private String fields;
private String keywords;
private Integer page;
private Integer pageSize;
private Boolean isHighlight = false;
}



