1.pom依赖引用
org.elasticsearch.client elasticsearch-rest-high-level-client 7.6.2 org.elasticsearch.client elasticsearch-rest-client 7.6.2 org.elasticsearch.client transport ${elasticsearch.version} org.elasticsearch elasticsearch 7.6.2 org.elasticsearch.client x-pack-transport ${elasticsearch.version}
2.yml 参数配置
elasticsearch:
ansy:
page-size: 7000
cluster-name: my-application
index-prefix: xz
#用户名及密码
xpack-security-user: elastic:elastic
#es集群
ips:
- ip: 127.0.0.1
port: 9200
#- ip: 127.0.0.1
# port: 9300
3.config配置获取参数
package com.example.demo.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
@Component
@ConfigurationProperties(prefix = "elasticsearch.ansy")
public class ElasticsearchConfig {
private Map sqls;
// 数据库读取线程数
private int pThreadNum;
// ES同步线程数
private int cThreadNum;
//数据库读取条数
private int pageSize;
//集群IP
private List
package com.example.demo.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RestHighClientConfig {
@Autowired
private ElasticsearchConfig elasticsearchConfig;
@Bean(name = "RestHighLevelClient")
public RestHighLevelClient highClient(){
RestHighLevelClient highClient = new RestHighLevelClient(RestClient
.builder(new HttpHost("127.0.0.1",9200,"http")));
return highClient;
}
}
4.创建通用查询方法
package com.example.demo.service;
import com.example.demo.enter.User;
import java.util.List;
import java.util.Map;
public interface ElasticSearchService {
boolean createIndex(String index) throws Exception;
boolean existIndex(String index) throws Exception;
boolean deleteIndex(String index) throws Exception;
boolean adddocument(String index, String id, String content) throws Exception;
boolean isExistsdocument(String index, String id) throws Exception;
String getdocument(String index, String id) throws Exception;
boolean updatedocument(String index, String id, String content) throws Exception;
boolean deletedocument(String index, String id) throws Exception;
boolean bulkRequest(String index, List contents) throws Exception;
List> searchRequest(String index, String keyword) throws Exception;
List searchAllRequest(String index) throws Exception;
}
package com.example.demo.service.impl;
import com.example.demo.enter.User;
import com.example.demo.service.ElasticSearchService;
import com.example.demo.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.Timevalue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Component
@Slf4j
@Service
public class ElasticSearchServiceImpl implements ElasticSearchService {
@Resource
private RestHighLevelClient restHighLevelClient;
@Override
public boolean createIndex(String index) throws Exception {
// 判断索引是否存在
if(this.existIndex(index)){
return true;
}
// 创建索引
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
return createIndexResponse.isAcknowledged();
}
@Override
public boolean existIndex(String index) throws Exception {
// 判断索引是否存在
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
return restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
}
@Override
public boolean deleteIndex(String index) throws Exception {
// 判断索引是否存在
if(!this.existIndex(index)){
return true;
}
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
return acknowledgedResponse.isAcknowledged();
}
@Override
public boolean adddocument(String index, String id, String content) throws Exception {
if(!this.createIndex(index)){
return false;
}
IndexRequest indexRequest = new IndexRequest(index);
// 设置超时时间
indexRequest.id(id);
indexRequest.timeout(Timevalue.timevalueSeconds(1000000000));
//indexRequest.type();
// 转换为json字符串
indexRequest.source(content, XContentType.JSON);
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
return indexResponse.status().getStatus() == 200;
}
@Override
public boolean isExistsdocument(String index, String id) throws Exception {
// 判断是否存在文档
GetRequest getRequest = new GetRequest(index,id);
// 不获取返回的_source的上下文
getRequest.fetchSourceContext(new FetchSourceContext(false));
getRequest.storedFields("_none_");
return restHighLevelClient.exists(getRequest, RequestOptions.DEFAULT);
}
@Override
public String getdocument(String index, String id) throws Exception {
// 获取文档
GetRequest getRequest = new GetRequest(index, id);
GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
return getResponse.getSourceAsString();
}
@Override
public boolean updatedocument(String index, String id, String content) throws Exception {
// 更新文档
UpdateRequest updateRequest = new UpdateRequest(index, id);
updateRequest.timeout(Timevalue.timevalueSeconds(1));
updateRequest.doc(content, XContentType.JSON);
UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
return updateResponse.status().getStatus() == 200;
}
@Override
public boolean deletedocument(String index, String id) throws Exception {
if(!this.isExistsdocument(index, id)){
return true;
}
// 删除文档
DeleteRequest deleteRequest = new DeleteRequest(index, id);
deleteRequest.timeout(Timevalue.timevalueSeconds(1));
DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
return deleteResponse.status().getStatus() == 200;
}
@Override
public boolean bulkRequest(String index, List contents) throws Exception {
// 批量插入
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(Timevalue.timevalueSeconds(1));
contents.forEach(x -> {
bulkRequest.add(
new IndexRequest(index)
.id(x.getId().toString())
.source(JsonUtils.objectToJson(x), XContentType.JSON));
});
BulkResponse bulkItemResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
return !bulkItemResponse.hasFailures();
}
@Override
public List> searchRequest(String index, String keyword) throws Exception {
// 搜索请求
SearchRequest searchRequest;
if(StringUtils.isEmpty(index)){
searchRequest = new SearchRequest();
}else {
searchRequest = new SearchRequest(index);
}
// 条件构造
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 第几页
searchSourceBuilder.from(0);
// 每页多少条数据
searchSourceBuilder.size(1000);
// 配置高亮
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("name").field("description");
highlightBuilder.preTags("");
highlightBuilder.postTags("");
searchSourceBuilder.highlighter(highlightBuilder);
// 精确查询
// QueryBuilders.termQuery();
// 匹配所有
// QueryBuilders.matchAllQuery();
// 最细粒度划分:ik_max_word,最粗粒度划分:ik_smart
//multiMatchQuery多字段匹配查询
searchSourceBuilder.query(QueryBuilders.multiMatchQuery(keyword,"name", "description","id"));
// matchQuery 单字段匹配一个值
// searchSourceBuilder.query(QueryBuilders.matchQuery("content", keyWord));
searchSourceBuilder.timeout(Timevalue.timevalueSeconds(10));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
List> results = new ArrayList<>();
for (SearchHit searchHit : searchResponse.getHits().getHits()){
Map highlightFieldMap = searchHit.getHighlightFields();
HighlightField title = highlightFieldMap.get("name");
HighlightField description = highlightFieldMap.get("description");
// 原来的结果
Map sourceMap = searchHit.getSourceAsMap();
// 解析高亮字段,替换掉原来的字段
if (title != null){
Text[] fragments = title.getFragments();
StringBuilder n_title = new StringBuilder();
for (Text text : fragments){
n_title.append(text);
}
sourceMap.put("name", n_title.toString());
}
if (description != null){
Text[] fragments = description.getFragments();
StringBuilder n_description = new StringBuilder();
for (Text text : fragments){
n_description.append(text);
}
sourceMap.put("description", n_description.toString());
}
results.add(sourceMap);
}
return results;
}
@Override
public List searchAllRequest(String index) throws Exception {
// 搜索请求
SearchRequest searchRequest;
if(StringUtils.isEmpty(index)){
searchRequest = new SearchRequest();
}else {
searchRequest = new SearchRequest(index);
}
// 条件构造
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 第几页
searchSourceBuilder.from(0);
// 每页多少条数据
searchSourceBuilder.size(1000);
// 匹配所有
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchSourceBuilder.timeout(Timevalue.timevalueSeconds(10));
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
List results = new ArrayList<>();
for (SearchHit searchHit : searchResponse.getHits().getHits()){
results.add(Integer.valueOf(searchHit.getId()));
}
return results;
}
}
5.创建测试实体类
package com.example.demo.enter;
import lombok.Data;
@Data
public class User {
private Long id;
private String name;
private String description;
private int age;
}
6.创建定时任务批量导入数据
package com.example.demo.scheduled;
import com.example.demo.service.ElasticSearchService;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.CollectionUtils;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@EnableScheduling
@Configuration
@Slf4j
public class insertElasticSearchTask {
@Autowired
private ElasticSearchService searchService;
@Autowired
private ElasticSearchService elasticSearchService;
@Scheduled(cron = "0 */1 * * * ?")
@Scheduled(fixedRate=5000)
public void insertData() throws Exception {
List> ss = elasticSearchService.searchRequest("test_index", "测试");
System.out.println("***********"+ new Gson().toJson(ss) +"************");
log.info("*****定时任务*****"+ new Gson().toJson(ss) +"************");
}
}
7.添加接口测试
package com.example.demo.controller;
import com.example.demo.common.Result;
import com.example.demo.service.ElasticSearchService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@RestController
@Api(tags = "ES查询")
@RequestMapping("file")
@Slf4j
public class ElasticSearchController {
@Autowired
private ElasticSearchService elasticSearchService;
@ApiOperation(value = "Es查询",notes = "***",httpMethod = "POST")
@ApiImplicitParams({
@ApiImplicitParam(name = "index", value = "index", required = true, dataType = "String", paramType = "query"),
@ApiImplicitParam(name = "keyword", value = "关键字", required = false, dataType = "int", paramType = "query")
})
@PostMapping(value = "/getElasticSearchPost")
public Result getElasticSearchPost(@RequestParam(required = false) Map params){
try {
List> list = elasticSearchService.searchRequest((String) params.get("index"),(String) params.get("keyword"));
return Result.succeed(list,"success");
}catch (Exception e){
return Result.failed("查询异常");
}
}
@ApiOperation(value = "Es查询",notes = "***",httpMethod = "GET")
@ApiImplicitParams({
@ApiImplicitParam(name = "index", value = "index", required = true, dataType = "String", paramType = "query"),
@ApiImplicitParam(name = "keyword", value = "关键字", required = false, dataType = "String", paramType = "query")
})
@GetMapping(value = "/getElasticSearchGet")
public Result getElasticSearchGet(@RequestParam(required = false) Map params){
try {
List> list = elasticSearchService.searchRequest((String) params.get("index"),(String) params.get("keyword"));
Map map = new HashMap<>();
return Result.succeed(list,"success");
}catch (Exception e){
return Result.failed("查询异常");
}
}
}



