栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot集成ElasticSearch使用RestHighLevelClient连接客户端

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot集成ElasticSearch使用RestHighLevelClient连接客户端

ES查询结构图


1.pom依赖引用

        
            org.elasticsearch.client
            elasticsearch-rest-high-level-client
            7.6.2
        
        
            org.elasticsearch.client
            elasticsearch-rest-client
            7.6.2
        
        
            org.elasticsearch.client
            transport
            ${elasticsearch.version}
        

        
            org.elasticsearch
            elasticsearch
            7.6.2
        
        
            org.elasticsearch.client
            x-pack-transport
            ${elasticsearch.version}
        

2.yml 参数配置

elasticsearch:
  ansy:
    page-size: 7000
    cluster-name: my-application
    index-prefix: xz
    #用户名及密码
    xpack-security-user: elastic:elastic
    #es集群
    ips:
      - ip: 127.0.0.1
        port: 9200
      #- ip: 127.0.0.1
      #  port: 9300

3.config配置获取参数

package com.example.demo.config;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;

@Component
@ConfigurationProperties(prefix = "elasticsearch.ansy")
public class ElasticsearchConfig {
    private Map sqls;

    // 数据库读取线程数
    private int pThreadNum;

    // ES同步线程数
    private int cThreadNum;

    //数据库读取条数
    private int pageSize;

    //集群IP
    private List> ips;

    private String clusterName;

    private String timepath;
    //ESXpack用户名密码
    private String xpackSecurityUser;
    //ESXpack instance.key
    private String xpackSslKey;
    //ESXpack instance.crt
    private String xpackSslCertificate;
    //ESXpack ca.crt
    private String xpackSslCertificateauthorities;
    //ESXpack xpack.security.transport.ssl.verification_mode certificate
    private String xpackSslVerificationmode;
    //ESXpack xpack.security.transport.ssl.enabled
    private String xpackSslEnabled;

    public Map getSqls() {
        return sqls;
    }

    public void setSqls(Map sqls) {
        this.sqls = sqls;
    }

    public int getpThreadNum() {
        return pThreadNum;
    }

    public void setpThreadNum(int pThreadNum) {
        this.pThreadNum = pThreadNum;
    }

    public int getcThreadNum() {
        return cThreadNum;
    }

    public void setcThreadNum(int cThreadNum) {
        this.cThreadNum = cThreadNum;
    }

    public int getPageSize() {
        return pageSize;
    }

    public void setPageSize(int pageSize) {
        this.pageSize = pageSize;
    }

    public List> getIps() {
        return ips;
    }

    public void setIps(List> ips) {
        this.ips = ips;
    }

    public String getClusterName() {
        return clusterName;
    }

    public void setClusterName(String clusterName) {
        this.clusterName = clusterName;
    }

    public String getTimepath() {
        return timepath;
    }

    public void setTimepath(String timepath) {
        this.timepath = timepath;
    }

    public String getXpackSecurityUser() {
        return xpackSecurityUser;
    }

    public void setXpackSecurityUser(String xpackSecurityUser) {
        this.xpackSecurityUser = xpackSecurityUser;
    }

    public String getXpackSslKey() {
        return xpackSslKey;
    }

    public void setXpackSslKey(String xpackSslKey) {
        this.xpackSslKey = xpackSslKey;
    }

    public String getXpackSslCertificate() {
        return xpackSslCertificate;
    }

    public void setXpackSslCertificate(String xpackSslCertificate) {
        this.xpackSslCertificate = xpackSslCertificate;
    }

    public String getXpackSslCertificateauthorities() {
        return xpackSslCertificateauthorities;
    }

    public void setXpackSslCertificateauthorities(String xpackSslCertificateauthorities) {
        this.xpackSslCertificateauthorities = xpackSslCertificateauthorities;
    }

    public String getXpackSslVerificationmode() {
        return xpackSslVerificationmode;
    }

    public void setXpackSslVerificationmode(String xpackSslVerificationmode) {
        this.xpackSslVerificationmode = xpackSslVerificationmode;
    }

    public String getXpackSslEnabled() {
        return xpackSslEnabled;
    }

    public void setXpackSslEnabled(String xpackSslEnabled) {
        this.xpackSslEnabled = xpackSslEnabled;
    }
}

package com.example.demo.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RestHighClientConfig {

    @Autowired
    private ElasticsearchConfig elasticsearchConfig;

    
    @Bean(name = "RestHighLevelClient")
    public RestHighLevelClient highClient(){
        RestHighLevelClient highClient = new RestHighLevelClient(RestClient
                .builder(new HttpHost("127.0.0.1",9200,"http")));
        return highClient;
    }
}

4.创建通用查询方法

package com.example.demo.service;

import com.example.demo.enter.User;

import java.util.List;
import java.util.Map;

public interface ElasticSearchService {
    
    boolean createIndex(String index) throws Exception;

    
    boolean existIndex(String index) throws Exception;

    
    boolean deleteIndex(String index) throws Exception;

    
    boolean adddocument(String index, String id, String content) throws Exception;

    
    boolean isExistsdocument(String index, String id) throws Exception;

    
    String getdocument(String index, String id) throws Exception;

    
    boolean updatedocument(String index, String id, String content) throws Exception;

    
    boolean deletedocument(String index, String id) throws Exception;

    
    boolean bulkRequest(String index, List contents) throws Exception;

    
    List> searchRequest(String index, String keyword) throws Exception;

    
    List searchAllRequest(String index) throws Exception;
}

package com.example.demo.service.impl;

import com.example.demo.enter.User;
import com.example.demo.service.ElasticSearchService;
import com.example.demo.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.Timevalue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;


@Component
@Slf4j
@Service
public class ElasticSearchServiceImpl implements ElasticSearchService {

    @Resource
    private RestHighLevelClient restHighLevelClient;

    
    @Override
    public boolean createIndex(String index) throws Exception {
        // 判断索引是否存在
        if(this.existIndex(index)){
            return true;
        }
        // 创建索引
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
        CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
        return createIndexResponse.isAcknowledged();
    }

    
    @Override
    public boolean existIndex(String index) throws Exception {
        // 判断索引是否存在
        GetIndexRequest getIndexRequest = new GetIndexRequest(index);
        return restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
    }

    
    @Override
    public boolean deleteIndex(String index) throws Exception {
        // 判断索引是否存在
        if(!this.existIndex(index)){
            return true;
        }
        DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(index);
        AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
        return acknowledgedResponse.isAcknowledged();
    }

    
    @Override
    public boolean adddocument(String index, String id, String content) throws Exception {
        if(!this.createIndex(index)){
            return false;
        }

        IndexRequest indexRequest = new IndexRequest(index);
        // 设置超时时间
        indexRequest.id(id);
        indexRequest.timeout(Timevalue.timevalueSeconds(1000000000));
        //indexRequest.type();
        // 转换为json字符串
        indexRequest.source(content, XContentType.JSON);
        IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
        return indexResponse.status().getStatus() == 200;
    }

    
    @Override
    public boolean isExistsdocument(String index, String id) throws Exception {
        // 判断是否存在文档
        GetRequest getRequest = new GetRequest(index,id);
        // 不获取返回的_source的上下文
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        return restHighLevelClient.exists(getRequest, RequestOptions.DEFAULT);
    }

    
    @Override
    public String getdocument(String index, String id) throws Exception {
        // 获取文档
        GetRequest getRequest = new GetRequest(index, id);
        GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT);
        return getResponse.getSourceAsString();
    }

    
    @Override
    public boolean updatedocument(String index, String id, String content) throws Exception {
        // 更新文档
        UpdateRequest updateRequest = new UpdateRequest(index, id);
        updateRequest.timeout(Timevalue.timevalueSeconds(1));
        updateRequest.doc(content, XContentType.JSON);
        UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
        return updateResponse.status().getStatus() == 200;
    }

    
    @Override
    public boolean deletedocument(String index, String id) throws Exception {
        if(!this.isExistsdocument(index, id)){
            return true;
        }

        // 删除文档
        DeleteRequest deleteRequest = new DeleteRequest(index, id);
        deleteRequest.timeout(Timevalue.timevalueSeconds(1));
        DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
        return deleteResponse.status().getStatus() == 200;
    }

    
    @Override
    public boolean bulkRequest(String index, List contents) throws Exception {
        // 批量插入 
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(Timevalue.timevalueSeconds(1));
        contents.forEach(x -> {
            bulkRequest.add(
                    new IndexRequest(index)
                            .id(x.getId().toString())
                            .source(JsonUtils.objectToJson(x), XContentType.JSON));
        });
        BulkResponse bulkItemResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
        return !bulkItemResponse.hasFailures();
    }

    
    @Override
    public List> searchRequest(String index, String keyword) throws Exception {
        // 搜索请求
        SearchRequest searchRequest;
        if(StringUtils.isEmpty(index)){
            searchRequest = new SearchRequest();
        }else {
            searchRequest = new SearchRequest(index);
        }
        // 条件构造
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        // 第几页
        searchSourceBuilder.from(0);
        // 每页多少条数据
        searchSourceBuilder.size(1000);
        // 配置高亮
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.field("name").field("description");
        highlightBuilder.preTags("");
        highlightBuilder.postTags("");
        searchSourceBuilder.highlighter(highlightBuilder);
        // 精确查询
//        QueryBuilders.termQuery();
        // 匹配所有
//        QueryBuilders.matchAllQuery();
        // 最细粒度划分:ik_max_word,最粗粒度划分:ik_smart
        //multiMatchQuery多字段匹配查询
        searchSourceBuilder.query(QueryBuilders.multiMatchQuery(keyword,"name", "description","id"));
        // matchQuery 单字段匹配一个值
//        searchSourceBuilder.query(QueryBuilders.matchQuery("content", keyWord));
        searchSourceBuilder.timeout(Timevalue.timevalueSeconds(10));

        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        List> results = new ArrayList<>();
        for (SearchHit searchHit : searchResponse.getHits().getHits()){
            Map highlightFieldMap = searchHit.getHighlightFields();
            HighlightField title = highlightFieldMap.get("name");
            HighlightField description = highlightFieldMap.get("description");
            // 原来的结果
            Map sourceMap = searchHit.getSourceAsMap();
            // 解析高亮字段,替换掉原来的字段
            if (title != null){
                Text[] fragments = title.getFragments();
                StringBuilder n_title = new StringBuilder();
                for (Text text : fragments){
                    n_title.append(text);
                }
                sourceMap.put("name", n_title.toString());
            }
            if (description != null){
                Text[] fragments = description.getFragments();
                StringBuilder n_description = new StringBuilder();
                for (Text text : fragments){
                    n_description.append(text);
                }
                sourceMap.put("description", n_description.toString());
            }
            results.add(sourceMap);
        }
        return results;
    }

    
    @Override
    public List searchAllRequest(String index) throws Exception {
        // 搜索请求
        SearchRequest searchRequest;
        if(StringUtils.isEmpty(index)){
            searchRequest = new SearchRequest();
        }else {
            searchRequest = new SearchRequest(index);
        }
        // 条件构造
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        // 第几页
        searchSourceBuilder.from(0);
        // 每页多少条数据
        searchSourceBuilder.size(1000);
        // 匹配所有
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        searchSourceBuilder.timeout(Timevalue.timevalueSeconds(10));

        searchRequest.source(searchSourceBuilder);

        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);

        List results = new ArrayList<>();
        for (SearchHit searchHit : searchResponse.getHits().getHits()){
            results.add(Integer.valueOf(searchHit.getId()));
        }
        return results;
    }
}

5.创建测试实体类

package com.example.demo.enter;

import lombok.Data;


@Data
public class User {
    private Long id;
    private String name;
    private String description;
    private int age;
}

6.创建定时任务批量导入数据

package com.example.demo.scheduled;

import com.example.demo.service.ElasticSearchService;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.CollectionUtils;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


@EnableScheduling
@Configuration
@Slf4j
public class insertElasticSearchTask {


    @Autowired
    private ElasticSearchService searchService;
    @Autowired
    private ElasticSearchService elasticSearchService;

    @Scheduled(cron = "0 */1 * * * ?")
    @Scheduled(fixedRate=5000)
    public void insertData() throws Exception {

        List> ss = elasticSearchService.searchRequest("test_index", "测试");
        System.out.println("***********"+ new Gson().toJson(ss) +"************");
        log.info("*****定时任务*****"+ new Gson().toJson(ss) +"************");


    }
}

7.添加接口测试

package com.example.demo.controller;

import com.example.demo.common.Result;
import com.example.demo.service.ElasticSearchService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


@RestController
@Api(tags = "ES查询")
@RequestMapping("file")
@Slf4j
public class ElasticSearchController {

    @Autowired
    private ElasticSearchService elasticSearchService;


    @ApiOperation(value = "Es查询",notes = "***",httpMethod = "POST")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "index", value = "index", required = true, dataType = "String", paramType = "query"),
            @ApiImplicitParam(name = "keyword", value = "关键字", required = false, dataType = "int", paramType = "query")
    })
    @PostMapping(value = "/getElasticSearchPost")
    public Result getElasticSearchPost(@RequestParam(required = false) Map params){
        try {
            List> list = elasticSearchService.searchRequest((String) params.get("index"),(String) params.get("keyword"));
            return Result.succeed(list,"success");
        }catch (Exception e){
            return Result.failed("查询异常");
        }
    }

    @ApiOperation(value = "Es查询",notes = "***",httpMethod = "GET")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "index", value = "index", required = true, dataType = "String", paramType = "query"),
            @ApiImplicitParam(name = "keyword", value = "关键字", required = false, dataType = "String", paramType = "query")
    })
    @GetMapping(value = "/getElasticSearchGet")
    public Result getElasticSearchGet(@RequestParam(required = false) Map params){
        try {
            List> list = elasticSearchService.searchRequest((String) params.get("index"),(String) params.get("keyword"));
            Map map = new HashMap<>();
            return Result.succeed(list,"success");
        }catch (Exception e){
            return Result.failed("查询异常");
        }
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/490751.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号