将RestHighLevelClient添加到Spring容器
@Value("${elasticsearch.server.host:localhost}")
private String[] elasticsearchHost;
@Bean
public RestHighLevelClient restHighLevelClient() {
try {
HttpHost[] httpHosts = new HttpHost[elasticsearchHost.length];
//将地址转换为http主机数组,未配置端口则采用默认9200端口,配置了端口则用配置的端口
for (int i = 0; i < httpHosts.length; i++) {
if (!StringUtils.isEmpty(elasticsearchHost[i])) {
if (elasticsearchHost[i].contains(":")) {
String[] uris = elasticsearchHost[i].split(":");
httpHosts[i] = new HttpHost(uris[0], Integer.parseInt(uris[1]), "http");
} else {
httpHosts[i] = new HttpHost(elasticsearchHost[i], 9200, "http");
}
}
}
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder->requestConfigBuilder
.setConnectTimeout(3000)
.setSocketTimeout(5000)
.setConnectionRequestTimeout(500)));
return client;
} catch (Exception e) {
logger.error("创建elasticsearch客户端异常:", e);
throw new RuntimeException(e);
}
}
封装执行客户端
@Component
public class ElasticsearchClientUtil {
@Autowired
private RestHighLevelClient restHighLevelClient;
@Autowired
private SnowflakeIdUtils idUtils;
private Logger logger = LoggerFactory.getLogger(this.getClass());
public boolean exists(String indexName) {
try{
GetIndexRequest indexRequest = new GetIndexRequest(indexName);
return restHighLevelClient.indices().exists(indexRequest, RequestOptions.DEFAULT);
}catch (NonodeAvailableException e) {
logger.error("Elasticsearch服务不可用: ", e);
throw new RuntimeException("Elasticsearch服务不可用");
} catch (Exception e) {
logger.error("Elasticsearch数据异常: ", e);
throw new RuntimeException("Elasticsearch数据异常");
}
}
public boolean create(Class clazz) throws IOException{
return create(getIndexName(clazz), clazz);
}
public boolean create(String indexName, Class clazz) {
CreateIndexRequest requestIndex = new CreateIndexRequest(indexName);
requestIndex.mapping(getMapping(clazz));
try{
CreateIndexResponse indexResponse = restHighLevelClient.indices().create(requestIndex, RequestOptions.DEFAULT);
if (!indexResponse.isAcknowledged()) {//索引创建失败
return false;
}
//给集群一点时间拷贝副本
//TimeUnit.SECONDS.sleep(2);
//集群操作,再添加数据前可以先判断索引是否存在
}catch (NonodeAvailableException e) {
logger.error("Elasticsearch服务不可用: ", e);
throw new RuntimeException("Elasticsearch服务不可用");
} catch (Exception e) {
logger.error("Elasticsearch数据异常: ", e);
throw new RuntimeException("Elasticsearch数据异常");
}
return true;
}
private String getIndexName(Class clazz){
document doc = (document) clazz.getAnnotation(document.class);
return doc.indexName();
}
private Map getMapping(Class clazz){
Map properties = new ConcurrentHashMap();
for (java.lang.reflect.Field field : clazz.getDeclaredFields()) {
if (field.isAnnotationPresent(Field.class)) {
Map property = new ConcurrentHashMap();
Field fieldAnnotation = field.getAnnotation(Field.class);
property.put("type",fieldAnnotation.type().name().toLowerCase());
if (!"".equals(fieldAnnotation.analyzer())) {
property.put("analyzer", fieldAnnotation.analyzer());
}
if (!"".equals(fieldAnnotation.searchAnalyzer())) {
property.put("search_analyzer", fieldAnnotation.searchAnalyzer());
}
if (TermVector.none!=fieldAnnotation.termVector()){
property.put("term_vector",fieldAnnotation.termVector().name());
}
if (fieldAnnotation.store()) {
properties.put(field.getName(), property);
}
}
}
Map result = new HashMap();
result.put("properties",properties);
return result;
}
public String save(String indexName, Object index) {
try{
IndexResponse indexResponse = restHighLevelClient.index(getIndexRequest(indexName, index),RequestOptions.DEFAULT);
return indexResponse.getId();
}catch (NonodeAvailableException e) {
logger.error("Elasticsearch服务不可用: ", e);
throw new RuntimeException("Elasticsearch服务不可用");
} catch (Exception e) {
logger.error("Elasticsearch数据异常: ", e);
throw new RuntimeException("Elasticsearch数据异常");
}
}
private IndexRequest getIndexRequest(String indexName,Object index){
IndexRequest request = new IndexRequest(indexName);
String indexString = JSONObject.toJSonString(index);
JSonObject jsonObject = JSONObject.parseObject(indexString);
String id = analysisEntity(jsonObject, index.getClass());
request.id(id);
request.source(jsonObject, XContentType.JSON);
return request;
}
private String analysisEntity(JSonObject jsonObject,Class clazz){
String id = null;
for (java.lang.reflect.Field field : clazz.getDeclaredFields()) {
if (field.isAnnotationPresent(Id.class)) {
if(jsonObject.get(field.getName()) == null){ ;
id = idUtils.nextId()+"";
jsonObject.put(field.getName(), id);
}else {
id = jsonObject.get(field.getName()).toString();
}
}
if (field.isAnnotationPresent(Field.class)) {
if (!field.getAnnotation(Field.class).store()) {
jsonObject.remove(field.getName());
}
}
}
return id;
}
public Boolean batchSave(String indexName, Object... indexs) {
BulkRequest bulkRequest = new BulkRequest();
for (Object index : indexs) {
bulkRequest.add(getIndexRequest(indexName, index));
}
try{
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (RestStatus.OK == bulkResponse.status()) {
return true;
}
}catch (NonodeAvailableException e) {
logger.error("Elasticsearch服务不可用: ", e);
throw new RuntimeException("Elasticsearch服务不可用");
} catch (Exception e) {
logger.error("Elasticsearch数据异常: ", e);
throw new RuntimeException("Elasticsearch数据异常");
}
return false;
}
public boolean update(String indexName,Object index) {
String indexString = JSONObject.toJSonString(index);
JSonObject jsonObject = JSONObject.parseObject(indexString);
String id = analysisEntity(jsonObject, index.getClass());
UpdateRequest updateRequest = new UpdateRequest(indexName,id);
updateRequest.doc(jsonObject);
try{
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
return RestStatus.OK == updateResponse.status();
}catch (NonodeAvailableException e) {
logger.error("Elasticsearch服务不可用: ", e);
throw new RuntimeException("Elasticsearch服务不可用");
} catch (ElasticsearchStatusException e){
logger.error("elasticsearch索引状态错误",e);
throw new RuntimeException("索引记录异常,索引ID:"+id);
}catch (Exception e) {
logger.error("Elasticsearch数据异常: ", e);
throw new RuntimeException("Elasticsearch数据异常");
}
}
public Boolean delete(String indexName,String id) {
DeleteRequest deleteRequest = new DeleteRequest(indexName, id);
try {
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
return RestStatus.OK == deleteResponse.status();
}catch (NonodeAvailableException e) {
logger.error("Elasticsearch服务不可用: ", e);
throw new RuntimeException("Elasticsearch服务不可用");
}catch (ElasticsearchStatusException e){
logger.error("elasticsearch索引状态错误",e);
throw new RuntimeException("索引记录异常,索引ID:"+id);
} catch (Exception e) {
logger.error("Elasticsearch数据异常: ", e);
throw new RuntimeException("Elasticsearch数据异常");
}
}
public String save(Object index) throws IOException {
return save(getIndexName(index.getClass()), index);
}
public QueryData searchdocument(IndexRequestParams indexRequestParams,Class clazz,String... indexName){
QueryData queryData = new QueryData();
try {
SearchSourceBuilder sourceBuilder = getSearchSourceBuilder(indexRequestParams,clazz);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(indexName);
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
//封装查询到的数据
List 


