一、背景
Spring Data 的目的是用统一的接口,适配所有不同的存储类型。
Spring Data Elasticsearch是Spring Data的一个子项目,该项目旨在为新数据存储提供熟悉且一致的基于 Spring 的编程模型,同时保留特定于存储的功能和功能。Spring Data Elasticsearch是一个以 POJO 为中心的模型,用于与 Elastichsearch 文档交互并轻松编写 Repository 风格的数据访问层
二、依赖
org.springframework.boot
spring-boot-starter-data-elasticsearch
//其他工具类
com.google.code.gson
gson
2.8.5
com.alibaba
fastjson
1.2.47
com.openhtmltopdf
openhtmltopdf-core
0.0.1-RC9
三、工具类代码
1、ESUtil工具类
package com.msb.es.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.github.pagehelper.PageInfo;
import com.msb.es.dto.document;
import com.msb.es.dto.EsDataId;
import com.msb.es.dto.enums.FieldType;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.Timevalue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.*;
@Slf4j
@Component
public class ESUtil {
@Resource
private RestHighLevelClient restHighLevelClient;
private static int index_number_of_shards = 3;//默认分片数
private static int index_number_of_replicas = 1;//默认副本数 单节点
public void setIndexNumber(int index_number_of_shards, int index_number_of_replicas) {
this.index_number_of_shards = index_number_of_shards;
this.index_number_of_replicas = index_number_of_replicas;
}
public RestHighLevelClient getInstance() {
return restHighLevelClient;
}
//region 创建索引(默认分片数为3和副本数为1)
public boolean createIndex(Class clazz) throws Exception {
document declaredAnnotation = (document) clazz.getDeclaredAnnotation(document.class);
if (declaredAnnotation == null) {
throw new Exception(String.format("class name: %s can not find Annotation [document], please check", clazz.getName()));
}
String indexName = declaredAnnotation.indexName();
boolean flag = createRootIndex(indexName, clazz);
if (flag) {
return true;
}
return false;
}
public boolean createIndexIfNotExist(Class clazz) throws Exception {
document declaredAnnotation = (document) clazz.getDeclaredAnnotation(document.class);
if (declaredAnnotation == null) {
throw new Exception(String.format("class name: %s can not find Annotation [document], please check", clazz.getName()));
}
String indexName = declaredAnnotation.indexName();
boolean indexExists = isIndexExists(indexName);
if (!indexExists) {
boolean flag = createRootIndex(indexName, clazz);
if (flag) {
return true;
}
}
return false;
}
private boolean createRootIndex(String indexName, Class clazz) throws IOException {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder()
// 设置分片数, 副本数
.put("index.number_of_shards", index_number_of_shards)
.put("index.number_of_replicas", index_number_of_replicas)
);
request.mapping(generateBuilder(clazz));
CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
// 指示是否所有节点都已确认请求
boolean acknowledged = response.isAcknowledged();
// 指示是否在超时之前为索引中的每个分片启动了必需的分片副本数
boolean shardsAcknowledged = response.isShardsAcknowledged();
return acknowledged || shardsAcknowledged;
}
//endregion
//region 更新索引
public boolean updateIndex(Class clazz) throws Exception {
document declaredAnnotation = (document) clazz.getDeclaredAnnotation(document.class);
if (declaredAnnotation == null) {
throw new Exception(String.format("class name: %s can not find Annotation [document], please check", clazz.getName()));
}
String indexName = declaredAnnotation.indexName();
PutMappingRequest request = new PutMappingRequest(indexName);
request.source(generateBuilder(clazz));
AcknowledgedResponse response = restHighLevelClient.indices().putMapping(request, RequestOptions.DEFAULT);
// 指示是否所有节点都已确认请求
boolean acknowledged = response.isAcknowledged();
if (acknowledged) {
return true;
}
return false;
}
//endregion
//region 删除索引
public boolean delIndex(String indexName) {
boolean acknowledged = false;
try {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
deleteIndexRequest.indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN);
AcknowledgedResponse delete = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
acknowledged = delete.isAcknowledged();
} catch (IOException e) {
e.printStackTrace();
}
return acknowledged;
}
//endregion
//region 判断索引是否存在
public boolean isIndexExists(String indexName) {
boolean exists = false;
try {
GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);
getIndexRequest.humanReadable(true);
exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return exists;
}
//endregion
//region 添加单条数据
public IndexResponse index(Object o) throws Exception {
document declaredAnnotation = (document) o.getClass().getDeclaredAnnotation(document.class);
if (declaredAnnotation == null) {
throw new Exception(String.format("class name: %s can not find Annotation [document], please check", o.getClass().getName()));
}
String indexName = declaredAnnotation.indexName();
IndexRequest request = new IndexRequest(indexName);
Field fieldByAnnotation = getFieldByAnnotation(o, EsDataId.class);
if (fieldByAnnotation != null) {
fieldByAnnotation.setAccessible(true);
try {
Object id = fieldByAnnotation.get(o);
request = request.id(id.toString());
} catch (IllegalAccessException e) {
}
}
String userJson = JSON.toJSonString(o);
request.source(userJson, XContentType.JSON);
IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
return indexResponse;
}
//endregion
//region queryById
public String queryById(String indexName, String id) throws IOException {
GetRequest getRequest = new GetRequest(indexName, id);
// getRequest.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
String jsonStr = getResponse.getSourceAsString();
return jsonStr;
}
//endregion
//region 查询封装返回json字符串
public String search(String indexName, SearchSourceBuilder searchSourceBuilder) throws IOException {
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.scroll(Timevalue.timevalueMinutes(1L));
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchHits hits = searchResponse.getHits();
JSonArray jsonArray = new JSonArray();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
JSonObject jsonObject = JSON.parseObject(sourceAsString);
jsonArray.add(jsonObject);
}
return jsonArray.toJSonString();
}
//endregion
//region 查询封装,带分页
public PageInfo search(SearchSourceBuilder searchSourceBuilder, int pageNum, int pageSize, Class s) throws Exception {
document declaredAnnotation = (document) s.getDeclaredAnnotation(document.class);
if (declaredAnnotation == null) {
throw new Exception(String.format("class name: %s can not find Annotation [document], please check", s.getName()));
}
String indexName = declaredAnnotation.indexName();
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
JSonArray jsonArray = new JSonArray();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
JSonObject jsonObject = JSON.parseObject(sourceAsString);
jsonArray.add(jsonObject);
}
int total = (int) hits.getTotalHits().value;
// 封装分页
List list = jsonArray.toJavaList(s);
PageInfo page = new PageInfo<>();
page.setList(list);
page.setPageNum(pageNum);
page.setPageSize(pageSize);
page.setTotal(total);
page.setPages(total == 0 ? 0 : (total % pageSize == 0 ? total / pageSize : (total / pageSize) + 1));
page.setHasNextPage(page.getPageNum() < page.getPages());
return page;
}
//endregion
//region 查询封装,返回集合
public List search(SearchSourceBuilder searchSourceBuilder, Class s) throws Exception {
document declaredAnnotation = s.getDeclaredAnnotation(document.class);
if (declaredAnnotation == null) {
throw new Exception(String.format("class name: %s can not find Annotation [document], please check", s.getName()));
}
String indexName = declaredAnnotation.indexName();
SearchRequest searchRequest = new SearchRequest(indexName);
searchRequest.source(searchSourceBuilder);
searchRequest.scroll(Timevalue.timevalueMinutes(1L));
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// //配置标题高亮显示
// HighlightBuilder highlightBuilder = new HighlightBuilder(); //生成高亮查询器
// highlightBuilder.field(title); //高亮查询字段
// highlightBuilder.field(content); //高亮查询字段
// highlightBuilder.requireFieldMatch(false); //如果要多个字段高亮,这项要为false
// highlightBuilder.preTags(""); //高亮设置
// highlightBuilder.postTags("");
//
// //下面这两项,如果你要高亮如文字内容等有很多字的字段,必须配置,不然会导致高亮不全,文章内容缺失等
// highlightBuilder.fragmentSize(800000); //最大高亮分片数
// highlightBuilder.numOfFragments(0); //从第一个分片获取高亮片段
String scrollId = searchResponse.getScrollId();
SearchHits hits = searchResponse.getHits();
JSonArray jsonArray = new JSonArray();
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
JSonObject jsonObject = JSON.parseObject(sourceAsString);
jsonArray.add(jsonObject);
}
// 封装分页
List list = jsonArray.toJavaList(s);
return list;
}
//endregion
//region 批量插入文档
public boolean batchSaveOrUpdate(List list, boolean izAsync) throws Exception {
Object o1 = list.get(0);
document declaredAnnotation = (document) o1.getClass().getDeclaredAnnotation(document.class);
if (declaredAnnotation == null) {
throw new Exception(String.format("class name: %s can not find Annotation [@document], please check", o1.getClass().getName()));
}
String indexName = declaredAnnotation.indexName();
BulkRequest request = new BulkRequest(indexName);
for (Object o : list) {
String jsonStr = JSON.toJSonString(o);
IndexRequest indexReq = new IndexRequest().source(jsonStr, XContentType.JSON);
Field fieldByAnnotation = getFieldByAnnotation(o, EsDataId.class);
if (fieldByAnnotation != null) {
fieldByAnnotation.setAccessible(true);
try {
Object id = fieldByAnnotation.get(o);
indexReq = indexReq.id(id.toString());
} catch (IllegalAccessException e) {
}
}
request.add(indexReq);
}
if (izAsync) {
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
return outResult(bulkResponse);
} else {
restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, new ActionListener() {
@Override
public void onResponse(BulkResponse bulkResponse) {
outResult(bulkResponse);
}
@Override
public void onFailure(Exception e) {
}
});
}
return true;
}
//endregion
//region 删除文档
public boolean deleteDoc(String indexName, String docId) throws IOException {
DeleteRequest request = new DeleteRequest(indexName, docId);
DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
// 解析response
String index = deleteResponse.getIndex();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
return true;
}
//endregion
//region 根据json类型更新文档
public boolean updateDoc(String indexName, String docId, Object o) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, docId);
request.doc(JSON.toJSonString(o), XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
}
return false;
}
//endregion
//region 根据Map类型更新文档
public boolean updateDoc(String indexName, String docId, Map map) throws IOException {
UpdateRequest request = new UpdateRequest(indexName, docId);
request.doc(map);
UpdateResponse updateResponse = restHighLevelClient.update(request, RequestOptions.DEFAULT);
String index = updateResponse.getIndex();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
return true;
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
}
return false;
}
//endregion
//region generateBuilder
public XContentBuilder generateBuilder(Class clazz) throws IOException {
// 获取索引名称及类型
document doc = (document) clazz.getAnnotation(document.class);
System.out.println(doc.indexName());
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
builder.startObject("properties");
Field[] declaredFields = clazz.getDeclaredFields();
for (Field f : declaredFields) {
if (f.isAnnotationPresent(com.msb.es.dto.Field.class)) {
// 获取注解
com.msb.es.dto.Field declaredAnnotation =
f.getDeclaredAnnotation(com.msb.es.dto.Field.class);
// 如果嵌套对象:
if (declaredAnnotation.type() == FieldType.OBJECT) {
// 获取当前类的对象-- Action
Class> type = f.getType();
Field[] df2 = type.getDeclaredFields();
builder.startObject(f.getName());
builder.startObject("properties");
// 遍历该对象中的所有属性
for (Field f2 : df2) {
if (f2.isAnnotationPresent(com.msb.es.dto.Field.class)) {
// 获取注解
com.msb.es.dto.Field declaredAnnotation2 = f2.getDeclaredAnnotation(com.msb.es.dto.Field.class);
builder.startObject(f2.getName());
builder.field("type", declaredAnnotation2.type().getType());
// keyword不需要分词
if (declaredAnnotation2.type() == FieldType.TEXT) {
builder.field("analyzer", declaredAnnotation2.analyzer().getType());
}
if (declaredAnnotation2.type() == FieldType.DATE) {
builder.field("format", "yyyy-MM-dd HH:mm:ss");
}
builder.endObject();
}
}
builder.endObject();
builder.endObject();
} else {
builder.startObject(f.getName());
builder.field("type", declaredAnnotation.type().getType());
// keyword不需要分词
if (declaredAnnotation.type() == FieldType.TEXT) {
builder.field("analyzer", declaredAnnotation.analyzer().getType());
}
if (declaredAnnotation.type() == FieldType.DATE) {
builder.field("format", "yyyy-MM-dd HH:mm:ss");
}
builder.endObject();
}
}
}
// 对应property
builder.endObject();
builder.endObject();
return builder;
}
//endregion
//region getFieldByAnnotation
public static Field getFieldByAnnotation(Object o, Class annotationClass) {
Field[] declaredFields = o.getClass().getDeclaredFields();
if (declaredFields != null && declaredFields.length > 0) {
for (Field f : declaredFields) {
if (f.isAnnotationPresent(annotationClass)) {
return f;
}
}
}
return null;
}
//endregion
//region getLowLevelClient
public RestClient getLowLevelClient() {
return restHighLevelClient.getLowLevelClient();
}
//endregion
//region 高亮结果集 特殊处理
public List
3、配置文件
/config/QLElasticsearchValue.java
package com.msb.es.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "ql.es",ignoreInvalidFields=true)
public class QLElasticsearchValue {
private String esUrl; //url
private boolean connectEnabled;
private String username;
private String password;
private boolean userEnabled;
private int connectTimeOut = 1000; // 连接超时时间
private int socketTimeOut = 30000; // 连接超时时间
private int connectionRequestTimeOut = 500; // 获取连接的超时时间
private int maxConnectNum = 100; // 最大连接数
private int maxConnectPerRoute = 100; // 最大路由连接数
}
/config/RestClientConfig.java
package com.msb.es.config;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
@Configuration
public class RestClientConfig extends AbstractElasticsearchConfiguration {
@Override
@Bean
public RestHighLevelClient elasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo("192.168.247.142:9200")
.build();
return RestClients.create(clientConfiguration).rest();
}
}
4、调用方代码controller
package com.msb.es;
import com.msb.es.entity.MsbCarInfo;
import com.msb.es.service.ESServiceImpl;
import com.msb.es.util.ESUtil;
import lombok.SneakyThrows;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.elasticsearch.action.index.IndexRequest;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
@SpringBootTest
class ESApplicationTests {
@Resource
RestHighLevelClient highLevelClient;
@Resource
private ESServiceImpl searchServiceImp;
@Resource
private ESUtil esUtil;
@SneakyThrows
@Test
public void testCreate() {
IndexRequest request = new IndexRequest("spring-data")
.id("1")
.source(singletonMap("feature", "high-level-rest-client"))
.setRefreshPolicy(IMMEDIATE);
IndexResponse response = highLevelClient.index(request,RequestOptions.DEFAULT);
System.out.println("s");
}
@Test
public void createIndex() throws Exception {
boolean res = esUtil.createIndex(MsbCarInfo.class);
System.out.println(res);
}
@Test
public void delIndex() {
boolean res = esUtil.delIndex("msb_car_info");
System.out.println(res);
}
@Test
public void queryDataById() throws Exception {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.from(0);
searchSourceBuilder.size(5);
// 符合条件查询
BoolQueryBuilder boolBuilder = QueryBuilders.boolQuery();
boolBuilder.must(QueryBuilders.termQuery("name.keyword","大众Fox"));
searchSourceBuilder.query(boolBuilder);
List list = esUtil.search(searchSourceBuilder, MsbCarInfo.class);
System.out.println(list);
}
@Test
public void batchSaveOrUpdate() throws Exception {
MsbCarInfo msbCarInfo = new MsbCarInfo();
msbCarInfo.setId(1);
msbCarInfo.setName("ES学习代码");
msbCarInfo.setStatus(1);
List list = new ArrayList<>();
list.add(msbCarInfo);
esUtil.batchSaveOrUpdate(list,true);
System.out.println("ES学习");
};
}
四、项目git自取
es-sample-proj: ES-使用代码项目https://gitee.com/feng-qingxuan/es-sample-proj.git
ES学习步骤:
1、环境部署(单节点)ES部署+Kibana部署+IK分词器(单节点)_无敌小田田的博客-CSDN博客es部署+Kibana部署+IK分词器https://blog.csdn.net/qq_36602951/article/details/1217539542、基本命令整理
ES DSL命令CRUD整理_无敌小田田的博客-CSDN博客一、CRUD简单使用(1) 创建索引:PUT /index?pretty(2) 删除索引:DELETE /index?pretty(3) 查询索引全部内容:GET /product/_search查询单个id:GET /product/_doc/1查询索引: GET _cat/indices?v 查看索引的状态(4) 插入数据:PUT /index/_doc/id{JSON数据}PUT /product/_doc/1{"name": "张三","aghttps://blog.csdn.net/qq_36602951/article/details/121888259
3、ES+JAVA示例代码
CSDNhttps://mp.csdn.net/mp_blog/creation/editor/121888538



