栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Elasticsearch6.4.3——SpringBoot2.1.x整合

Elasticsearch6.4.3——SpringBoot2.1.x整合

1、添加依赖、根据项目需要,排除了jackson的三个依赖,可不排除。

 
            org.elasticsearch.client
            elasticsearch-rest-high-level-client
            6.4.3
            
                
                    com.fasterxml.jackson.dataformat
                    jackson-dataformat-smile
                
                
                    com.fasterxml.jackson.dataformat
                    jackson-dataformat-yaml
                
                
                    com.fasterxml.jackson.dataformat
                    jackson-dataformat-cbor
                
            
        

2、application.yml配置文件

elastic:
  es-host: 127.0.0.1(es安装服务器公网ip)
  es-port: 9200
  user-name: elastic
  password: elastic123456
  connect-count: 50
  connect-per-route: 10

3、EsProperties配置读取类

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = EsProperties.PREFIX)
public class EsProperties {

    public static final String PREFIX = "elastic";

    private String esHost;

    private Integer esPort;

    private String userName;

    private String password;
    
    private Integer connectCount;

    
    private Integer connectPerRoute;

}

4、添加EsClient工厂类EsClientSpringFactory

import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;

import java.io.IOException;

@Slf4j
public class EsClientSpringFactory {

    private static final int CONNECT_TIMEOUT_MILLIS = 1000;
    private static final int SOCKET_TIMEOUT_MILLIS = 30000;
    private static final int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;

    private static int MAX_CONN_PER_ROUTE;
    private static int MAX_CONN_TOTAL;
    private static HttpHost HTTP_HOST;
    private static String USER_NAME;
    private static String PASSWORD;

    private RestClientBuilder builder;
    private RestClient restClient;
    private RestHighLevelClient restHighLevelClient;

    private static final EsClientSpringFactory ES_CLIENT_SPRING_FACTORY = new EsClientSpringFactory();

    private EsClientSpringFactory() {
    }

    public static EsClientSpringFactory build(HttpHost esHost, String userName, String pwd, Integer maxConnectNum, Integer maxConnectPerRoute) {
        HTTP_HOST = esHost;
        USER_NAME = userName;
        PASSWORD = pwd;
        MAX_CONN_TOTAL = maxConnectNum;
        MAX_CONN_PER_ROUTE = maxConnectPerRoute;
        return ES_CLIENT_SPRING_FACTORY;
    }


    public void init() {
        builder = RestClient.builder(HTTP_HOST);
        setConnectTimeOutConfig();
        setMuftiConnectConfig();
        restClient = builder.build();
        restHighLevelClient = new RestHighLevelClient(builder);
    }

    
    public void setConnectTimeOutConfig() {
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
            requestConfigBuilder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
            requestConfigBuilder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
            return requestConfigBuilder;
        });
    }

    
    public void setMuftiConnectConfig() {
        //连接Elasticsearch集群
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(USER_NAME, PASSWORD));
        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(MAX_CONN_TOTAL);
            httpClientBuilder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            return httpClientBuilder;
        });
    }

    public RestClient getClient() {
        return restClient;
    }

    public RestHighLevelClient getRhlClient() {
        return restHighLevelClient;
    }

    public void close() {
        if (restClient != null) {
            try {
                restClient.close();
            } catch (IOException e) {
                log.error(e.getMessage());
            }
        }
        log.info("ES客户端实例关闭");

    }
}

5、创建连接EsConnectionBean

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

@Configuration
@ComponentScan(basePackageClasses = EsClientSpringFactory.class)
public class EsConnectionBean {

    @Resource
    private EsProperties esConnectionProperties;

    @Bean
    public HttpHost httpHost() {
        return new HttpHost(esConnectionProperties.getEsHost(), esConnectionProperties.getEsPort(), "http");
    }

    @Bean(initMethod = "init", destroyMethod = "close")
    public EsClientSpringFactory getFactory() {
        return EsClientSpringFactory.build(
                httpHost(),
                esConnectionProperties.getUserName(),
                esConnectionProperties.getPassword(),
                esConnectionProperties.getConnectCount(),
                esConnectionProperties.getConnectPerRoute()
        );
    }

    @Bean
    public RestClient getRestClient() {
        return getFactory().getClient();
    }

    @Bean
    public RestHighLevelClient getRhlClient() {
        return getFactory().getRhlClient();
    }


}

6、实现类EsServiceImpl

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.stereotype.Service;
import tech.honghu.common.exceptions.BizException;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;

@Slf4j
@Service
public class EsServiceImpl implements EsService {

    @Resource
    private RestHighLevelClient restHighLevelClient;

    @Override
    public ResultWrapper createIndex(String index, String settings, String mappings) {
        try {
            CreateIndexResponse createIndexResponse = null;
            if (!existsIndex(index)) {
                CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
                if (StringUtils.isNotBlank(settings)) {
                    buildIndexSetting(createIndexRequest, settings);
                }
                //buildIndexMapping(createIndexRequest, mappings);
                createIndexRequest.settings();
                createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
            }
            return new ResultWrapper().success(createIndexResponse == null ? "索引已存在" : JSON.toJSONString(createIndexResponse));
        } catch (Exception e) {
            log.error("索引创建失败", e);
            return new ResultWrapper().fail(e.getMessage());
        }
    }

    @Override
    public void buildIndexSetting(CreateIndexRequest createIndexRequest, String setting) {
        createIndexRequest.settings(setting, XContentType.JSON);
    }

    @Override
    public void buildIndexMapping(CreateIndexRequest createIndexRequest, String mapping) {
        createIndexRequest.mapping(mapping, XContentType.JSON);
    }

    @Override
    public ResultWrapper deleteIndex(String indexName) {
        try {
            if (!existsIndex(indexName)) {
                return new ResultWrapper().success("索引不存在");
            }
            DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName);
            restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT);
            return new ResultWrapper().success("索引删除成功");
        } catch (Exception e) {
            log.error("索引删除失败", e);
            return new ResultWrapper().fail(e.getMessage());
        }
    }

    @Override
    public ResultWrapper createIndexDoc(IndexDocDTO indexDocDto) {
        IndexRequest request = new IndexRequest();
        request.index(indexDocDto.getIndex());
        request.type(indexDocDto.getType());
        request.id(StringUtils.isNotBlank(indexDocDto.getId()) ? indexDocDto.getId() : UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
        request.source(indexDocDto.getSource(), XContentType.JSON);
        try {
            IndexResponse index = restHighLevelClient.index(request, RequestOptions.DEFAULT);
            return new ResultWrapper().success(JSON.toJSONString(index));
        } catch (IOException e) {
            log.error("创建文档失败", e);
            return new ResultWrapper().fail(e.getMessage());
        }
    }


    @Override
    public ResultWrapper updateIndexDoc(IndexDocDTO indexDocDto) {
        try {
            UpdateRequest updateRequest = new UpdateRequest(indexDocDto.getIndex(), indexDocDto.getType(), indexDocDto.getId());
            updateRequest.doc(indexDocDto.getSource(), XContentType.JSON);
            UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT);
            return new ResultWrapper().success(JSON.toJSONString(updateResponse));
        } catch (Exception e) {
            log.error("更新文档失败", e);
            return new ResultWrapper().fail(e.getMessage());
        }
    }

    @Override
    public ResultWrapper deleteIndexDoc(IndexDocDTO indexDocDto) {
        try {
            DeleteRequest deleteRequest = new DeleteRequest(indexDocDto.getIndex(), indexDocDto.getType(), indexDocDto.getId());
            DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);
            return new ResultWrapper().success(JSON.toJSONString(deleteResponse));
        } catch (IOException e) {
            log.error("删除文档失败", e);
            return new ResultWrapper().fail(e.getMessage());
        }
    }

    
    @Override
    public Boolean existsIndex(String index) {
        GetIndexRequest request = new GetIndexRequest();
        request.indices(index);
        Boolean flag = false;
        try {
            flag = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }


    @Override
    public ResultWrapper bulkInsert(List IndexDocDTOList) {
        try {
            if (CollectionUtils.isEmpty(IndexDocDTOList)) {
                return new ResultWrapper().fail("插入数据为空");
            }
            BulkRequest bulkAddRequest = new BulkRequest();
            for (IndexDocDTO IndexDocDTO : IndexDocDTOList) {
                IndexRequest indexRequest = new IndexRequest(IndexDocDTO.getIndex());
                indexRequest.id(StringUtils.isNotBlank(IndexDocDTO.getId()) ? IndexDocDTO.getId() : UUID.randomUUID().toString().replaceAll("-", "").toUpperCase());
                indexRequest.type(IndexDocDTO.getType());
                indexRequest.source(IndexDocDTO.getSource(), XContentType.JSON);
                bulkAddRequest.add(indexRequest);
            }
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkAddRequest, RequestOptions.DEFAULT);
            return new ResultWrapper().success(JSON.toJSONString(bulkResponse));
        } catch (Exception e) {
            log.error("批量添加文档失败", e);
            return new ResultWrapper().fail(e.getMessage());
        }
    }

    @Override
    public ResultWrapper shoppingUserOrderSearch(SearchDocDTO searchDocDTO) {
        String search = validateSearch(searchDocDTO);
        if (StringUtils.isNotBlank(search)) {
            return new ResultWrapper().fail(search);
        }
        try {
            int pageNo = searchDocDTO.getPage();
            int pageSize = searchDocDTO.getPageSize();
            int startIndex = (pageNo - 1) * pageSize;
            //获取索引名称
            String index = searchDocDTO.getIndex();
            IPage> page = new Page<>(pageNo, pageSize);
            List> searchList;
            SearchRequest searchRequest = new SearchRequest(index);
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            String[] fieldNames = getQueryFieldNames(index, searchDocDTO.getType());
            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
            //精确匹配
            if (StringUtils.isNotBlank(searchDocDTO.getMerchantId())) {
                boolQueryBuilder.must(QueryBuilders.matchPhraseQuery("merchantId", searchDocDTO.getMerchantId()));
            }
            if (StringUtils.isNotBlank(searchDocDTO.getUserId())) {
                boolQueryBuilder.must(QueryBuilders.matchPhraseQuery("userId", searchDocDTO.getUserId()));
            }
            //模糊匹配
            if (StringUtils.isNotBlank(searchDocDTO.getKeywords())) {
                boolQueryBuilder.must(QueryBuilders.multiMatchQuery(searchDocDTO.getKeywords(), fieldNames).maxExpansions(10));
            }
            sourceBuilder.query(boolQueryBuilder);
            sourceBuilder.from(startIndex);
            sourceBuilder.size(pageSize);
            if (searchDocDTO.getIsHighlight()) {
                HighlightBuilder highlightBuilder = new HighlightBuilder();
                Arrays.stream(fieldNames).forEach(e -> {
                            highlightBuilder.field(e).highlighterType("unified").preTags("").postTags("");
                        }
                );
                sourceBuilder.highlighter(highlightBuilder);
            }
            searchRequest.source(sourceBuilder);
            SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            RestStatus restStatus = searchResponse.status();
            if (RestStatus.OK.equals(restStatus)) {
                SearchHits hits = searchResponse.getHits();
                long totalHits = hits.getTotalHits();
                SearchHit[] searchHits = hits.getHits();
                searchList = getSearchList(searchHits, searchDocDTO.getIsHighlight());
                page.setRecords(searchList);
                page.setTotal(totalHits);
            }
            return new ResultWrapper().success(page);
        } catch (Exception e) {
            log.error("查询失败", e);
            return new ResultWrapper().fail(e.getMessage());
        }
    }

    private String validateSearch(SearchDocDTO searchDocDTO) {
        if (searchDocDTO.getPage() == null || searchDocDTO.getPageSize() == null) {
            return "分页条件不能为空";
        }
        if (StringUtils.isBlank(searchDocDTO.getIndex())) {
            return "索引名不能为空";
        }
        if (StringUtils.isBlank(searchDocDTO.getType())) {
            return "索引类型不能为空";
        }
        return null;
    }

    @Override
    public String[] getAllFieldNames(String index, String type) {
        try {
            GetIndexRequest getIndexRequest = new GetIndexRequest();
            getIndexRequest.indices(index);
            GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(getIndexRequest, RequestOptions.DEFAULT);
            Map mappingmetaData = getIndexResponse.getMappings().get(index).get(type).getSourceAsMap();
            linkedHashMap linkedHashMap = (linkedHashMap) (mappingmetaData.get("properties"));
            return linkedHashMap.keySet().toArray(new String[linkedHashMap.keySet().size()]);
        } catch (Exception e) {
            throw new BizException(e.getMessage());
        }
    }

    @Override
    public String[] getQueryFieldNames(String index, String type) {
        try {
            GetIndexRequest indexRequest = new GetIndexRequest();
            indexRequest.indices(index);
            GetIndexResponse getIndexResponse = restHighLevelClient.indices().get(indexRequest, RequestOptions.DEFAULT);
            Map map = getIndexResponse.getMappings().get(index).get(type).getSourceAsMap();
            linkedHashMap> linkedHashMap = (linkedHashMap>) map.get("properties");
            List fieldNames = new ArrayList<>();
            for (Map.Entry> entry : linkedHashMap.entrySet()) {
                String key = entry.getKey();
                linkedHashMap linkedValue = entry.getValue();
                String typevalue = (String) linkedValue.get("type");
                if (typevalue != null) {
                    if ("text".equals(typevalue)) {
                        fieldNames.add(key);
                    }
                }
            }
            int listSize = fieldNames.size();
            return fieldNames.toArray(new String[listSize]);
        } catch (Exception e) {
            throw new BizException(e.getMessage());
        }
    }

    @Override
    public List> getSearchList(SearchHit[] searchHits, Boolean isHighlight) {
        List> searchList = new ArrayList<>(16);
        for (SearchHit hit : searchHits) {
            Map sourceAsMap = hit.getSourceAsMap();
            if (isHighlight) {
                //取高亮结果
                Map highlightFields = hit.getHighlightFields();
                for (Map.Entry entry : highlightFields.entrySet()) {
                    String key = entry.getKey();
                    HighlightField highlight = entry.getValue();
                    if (highlight != null) {
                        Text[] fragments = highlight.fragments();
                        if (fragments != null) {
                            String fragmentString = fragments[0].string();
                            sourceAsMap.put(key, fragmentString);
                        }
                    }
                }
            }
            searchList.add(sourceAsMap);
        }
        return searchList;
    }

}

7、IndexDocDTO类

import com.alibaba.fastjson.JSONObject;
import lombok.Data;

@Data
public class IndexDocDTO {

    
    private String index;

    
    private String id;

    
    private String type;

    
    private JSONObject source;

}

8、SearchDocDTO类

import lombok.Data;

@Data
public class SearchDocDTO {

    private  String userId;

    private String merchantId;
    
    private String id;
    
    private String type;

    
    private String index;

    
    private String fieldName;

    
    private String fields;
    
    private String keywords;

    private Integer page;

    private Integer pageSize;

    
    private Boolean isHighlight = false;

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/742848.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号