栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

ES7简单实践

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

ES7简单实践

背景:公司原有报表数据直接查询的mysql,同时部分业务数据以JSON格式存储于mysql字段中, 而随着业务的发展有以下瓶颈出现

1、需要JSON中的字段也能作为查询条件,虽然msql支持JSON查询,但是mybatis 貌似不支持并且性能有待考证

2、随着业务的发展,单表数据达到千万级别,而报表查询的查询条件跨度较大,现有报表查询进程出现超时的情况

为了解决以上问题,决定把mysql的业务数据迁移到ES集群,为了数据查看做了数据的平移,mysql存储为JSON格式的数据,迁移的ES后数据类型定义为nested

事件如下:

添加maven依赖(7.5.1)

            
                org.elasticsearch
                elasticsearch
                ${es.version}
            
            
                org.elasticsearch.client
                elasticsearch-rest-high-level-client
                ${es.version}
            
            
                org.elasticsearch.client
                elasticsearch-rest-client-sniffer
                ${es.version}
            

 es客户端初始化配置

@Slf4j
@Getter
@Configuration
public class EsConfiguration {

    
    private final static int SNIFF_INTERVAL_MILLIS = 60000;
    
    private final static int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 10000;

    @value("es.es7.host")
    private String serverHost;
    @value("es.es7.http.port")
    private Integer serverPort;
    @value("es.es7.username")
    private String userName;
    @value("es.es7.password")
    private String password;

    
    @Bean(destroyMethod = "close")
    public RestHighLevelClient restHighLevelClient() {
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
        ArrayList httpHosts = new ArrayList<>();
        String[] hosts = serverHost.split(",");
        for (String host : hosts) {
            int index = host.indexOf(":");
            int port = serverPort;
            String address = host;
            if (index != -1) {
                port = Integer.parseInt(host.substring(index + 1));
                address = host.substring(0, index);
            }
            httpHosts.add(new HttpHost(address, port));
        }
        SniffonFailureListener sniffonFailureListener = new SniffonFailureListener();
        HttpHost[] hostsArray = httpHosts.toArray(new HttpHost[httpHosts.size()]);
        RestClientBuilder restClientBuilder = RestClient.builder(hostsArray)
                .setFailureListener(sniffOnFailureListener)
                .setHttpClientConfigCallback(httpClientBuilder -> {
                    httpClientBuilder.disableAuthCaching();
                    return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                });
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
        SnifferBuilder snifferBuilder = Sniffer.builder(restHighLevelClient.getLowLevelClient())
                .setSniffIntervalMillis(SNIFF_INTERVAL_MILLIS).setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS);
        sniffOnFailureListener.setSniffer(snifferBuilder.build());
        return restHighLevelClient;
    }

es操作类(官方API文档地址:Java REST Client [7.15] | Elastic)

@Service
@Slf4j
public class EsOperationServiceImpl implements EsOperationService {

    @Autowired
    RestHighLevelClient restHighLevelClient;

    
    public boolean existIndex(List indices) {
        try {
            return restHighLevelClient.indices().exists(new GetIndexRequest(indices.toArray(new String[indices.size()])), RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.info("EsOperationServiceImpl.existIndex {} error :", JSONObject.toJSonString(indices), e);
            throw new BusinessException("restClient.existIndex error :", e);
        }
    }
    
    
    public boolean createIndex(String index, XContentBuilder builder, Integer shards, Integer replicas) {
        boolean success = false;
        try {
            CreateIndexRequest request = new CreateIndexRequest(index);
            if (builder != null) {
                request.mapping(builder);
            }
            request.settings(Settings.builder()
                    //分区数量
                    .put("index.number_of_shards", shards)
                    //副本数量
                    .put("index.number_of_replicas", replicas)
                    .put("index.max_result_window", Integer.MAX_VALUE)
            );
            CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
            success = createIndexResponse.isAcknowledged() && createIndexResponse.isShardsAcknowledged();
            return success;
        } catch (Exception e) {
            log.error("EsOperationServiceImpl.createIndex {} error :", index, e);
            throw new BusinessException("restClient.createIndex error :", e);
        } finally {
            log.info("EsOperationServiceImpl.createIndex {} 成功 ? {}", index, success);
        }
    }

    @Override
    public boolean putSettings(String index, Integer shards, Integer replicas) {
        UpdateSettingsRequest request = new UpdateSettingsRequest(index);
        request.settings(Settings.builder()
                //分区数量
                .put("index.number_of_shards", shards)
                //副本数量
                .put("index.number_of_replicas", replicas));
        AcknowledgedResponse updateSettingsResponse;
        try {
            updateSettingsResponse = restHighLevelClient.indices().putSettings(request, RequestOptions.DEFAULT);
        } catch (IOException e) {
            log.info("EsOperationServiceImpl.putSettings {} error :", index, e);
            throw new BusinessException("EsOperationServiceImpl.putSettings error :", e);
        }
        log.info("EsOperationServiceImpl.putSettings 修改索引为 分区{} 副本{} 结果{}", shards, replicas, updateSettingsResponse.isAcknowledged());
        return updateSettingsResponse.isAcknowledged();
    }

    
    @Override
    public boolean deleteIndex(String index, boolean isSync) throws IOException {
        try {
            DeleteIndexRequest request = new DeleteIndexRequest(index);
            if (isSync) {
               restHighLevelClient.indices().deleteAsync(request, RequestOptions.DEFAULT,
                        new ActionListener() {
                            @Override
                            public void onResponse(AcknowledgedResponse acknowledgedResponse) {
                                log.info("EsOperationServiceImpl.deleteIndex {} sync is success ? {} :", index, acknowledgedResponse.isAcknowledged());
                                return;
                            }
                            @Override
                            public void onFailure(Exception e) {
                                log.info("EsOperationServiceImpl.deleteIndex {} sync throw exception :", index, e);
                                return;
                            }
                        });
                return true;
            }
            request.timeout(Timevalue.timevalueMinutes(5));
            AcknowledgedResponse deleteIndexResponse = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
            return deleteIndexResponse.isAcknowledged();
        } catch (ElasticsearchException e) {
            log.error("EsOperationService.deleteIndex {} throw exception :", index, e);
            if (e.status() == RestStatus.NOT_FOUND) {
                return true;
            }
            return false;
        }
    }


    
    public BulkResponse batchInsert(String index, Map sourceMap) {
        //创建BulkRequest
        BulkRequest bulkRequest = new BulkRequest();
        bulkRequest.timeout(Timevalue.timevalueMinutes(2));
        //便利id的list,插入到BulkRequest中。
        for (Map.Entry entry : sourceMap.entrySet()) {
            IndexRequest indexRequest = new IndexRequest(index).id(entry.getKey()).source(entry.getValue(), XContentType.JSON);
            bulkRequest.add(indexRequest);
        }
        try {
            //执行插入请求操作
            BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
            if (bulkResponse.hasFailures()) {
                StringBuffer failedMsg = new StringBuffer();
                for (BulkItemResponse bulkItemResponse : bulkResponse) {
                    if (bulkItemResponse.isFailed()) {
                        failedMsg.append(bulkItemResponse.getFailure().getMessage());
                    }
                    if (failedMsg.length() > 20000) {
                        //太大了没必要。 占内存
                        break;
                    }
                }
                log.error("EsOperationService.batchInsert {}  has errorMsg {} :", index, failedMsg.toString());
                throw new BusinessException(String.format("索引%s批量插入数据异常", index));
            }
            log.info("EsOperationService.batchInsert {} {} 条 success", index, sourceMap.size());
            return bulkResponse;
        } catch (IOException e) {
            log.error("EsOperationService.batchInsert {} throw exception :", index, e);
            throw new BusinessException("restClient.batchInsert.error :", e);
        }
    }

    
    public String save(String index, String id, String source) {
        try {
            IndexResponse indexResponse = restHighLevelClient.index(new IndexRequest(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
                    .id(id).source(source, XContentType.JSON), RequestOptions.DEFAULT);
            ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
            if (shardInfo.getFailed() > 0) {
                StringBuffer failedMsg = new StringBuffer();
                for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
                    failedMsg.append(failure.reason());
                    if (failedMsg.length() > 20000) {
                        break;
                    }
                }
                throw new BusinessException(String.format("索引%s保存数据异常%s", index, failedMsg.toString()));
            }
            return indexResponse.getId();
        } catch (Exception e) {
            log.error("EsOperationService.index {} save throw exception :", index, e);
            throw new BusinessException("es保存数据异常", e);
        } finally {
            log.info("EsOperationService.index done, index={}, id={}", new Object[]{index, id});
        }
    }

    public boolean deleteById(String index, String id) throws IOException {
        DeleteRequest request =
                new DeleteRequest(index);
        request.id(id);
        DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
        DocWriteResponse.Result result = deleteResponse.getResult();
        log.info("EsOperationServiceImpl.deleteById index {} id {} result {}", index, id, result.name());
        return result == DocWriteResponse.Result.DELETED || result == DocWriteResponse.Result.NOT_FOUND;
    }


    
    public EsQueryPageResponseVO> pageQuery(List indices, BoolQueryBuilder queryBuilder, String sortField, SortOrder sortOrder,
                                                                List aggregationBuilders, Integer pageNum, Integer size) {
        EsQueryPageResponseVO> pageQueryVO = new EsQueryPageResponseVO();
        pageNum = (pageNum > 0) ? pageNum - 1 : 0;
        SearchSourceBuilder searchSourceBuilder = SearchSourceBuilder.searchSource().query(queryBuilder)
                .sort(sortField, sortOrder).from(pageNum * size).size(size).trackTotalHits(true).explain(false);
        try {
            SearchRequest searchRequest = new SearchRequest(indices.toArray(new String[indices.size()]));
            aggregationBuilders = CollectionUtils.isEmpty(aggregationBuilders) ? new ArrayList<>() : aggregationBuilders;
            aggregationBuilders.forEach(aggregationBuilder -> searchSourceBuilder.aggregation(aggregationBuilder));
            searchRequest.source(searchSourceBuilder);
            Instant startTime = Instant.now();
            SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
            log.info("EsOperationServiceImpl.pageQuery indices {} param {} cost {} ms", JSONObject.toJSonString(indices),
                    searchSourceBuilder.toString(), Duration.between(startTime, Instant.now()).toMillis());
            Long totalCount = response.getHits().getTotalHits().value;
            Long totalPage = (totalCount % size) == 0 ? (totalCount / size) : (totalCount / size) + 1;
            pageQueryVO.setTotalSize(totalCount.intValue());
            pageQueryVO.setPages(totalPage.intValue());
            List> content = new ArrayList<>();
            for (SearchHit hit : response.getHits().getHits()) {
                content.add(hit.getSourceAsMap());
            }
            pageQueryVO.setContent(content);
            pageQueryVO.setAggregations(response.getAggregations());
        } catch (Exception e) {
            String msg = e == null ? "异常为null" : e.getMessage();
            log.error("EEsOperationService.pageQuery indices {} param {}  出现异常 {}", JSONObject.toJSonString(indices), searchSourceBuilder.toString(), msg);
            throw new BusinessException("es查询异常", e);
        }
        return pageQueryVO;
    }
}

因为我们业务中所有字段都要求可查询,而新增文档时es默认的数据类型可能不满足要求,所以我们是先创建索引再新增数据的,动态创建索引实践如下

ES数据类型枚举定义(实现了业务的数据类型与ES数据类型的转换关系,以及查询时的策略)

@Getter
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public enum EsdocumentFieldTypeEnum {
    TEXT("text"),
    KEYWORd("keyword"),
    LONG("long"),
    INTEGER("integer"),
    DOUBLE("double"),
    FLOAT("float"),
    BOOLEAN("boolean"),
    DATE("date"),
    OBJECT("object"),
    NESTED("nested"),
    ;

    private String value;

    
   public static EsdocumentFieldTypeEnum getByFieldType(FieldType fieldType) {
        if (fieldType == FieldType.NUMBER) {
            return LONG;
        }
        if (fieldType == FieldType.FLOAT) {
            return DOUBLE;
        }
        if (fieldType == FieldType.CHAR) {
            return KEYWORD;
        }
        if (fieldType == FieldType.DATE || fieldType == FieldType.TIME) {
            return DATE;
        }
        throw new BusinessException(String.format("新增的字段业务类型%s,请增加映射逻辑", fieldType.getCode()));
    }

    public static QueryBuilder getQueryBuilders(FieldType fieldType, String fieldCode, String value) {
        EsdocumentFieldTypeEnum esdocumentFieldTypeEnum = getByFieldType(fieldType);
        value = value.trim();
        if (esdocumentFieldTypeEnum == TEXT || esdocumentFieldTypeEnum == KEYWORD) {
           //文本类型走模糊匹配
            if (value.startsWith("*")) {
                String queryValue = value.endsWith("*") ? value : value.concat("*");
                return QueryBuilders.wildcardQuery(fieldCode, queryValue);
            }
            return QueryBuilders.prefixQuery(fieldCode, value);
       }
       return QueryBuilders.matchQuery(fieldCode, value);
    }
}

创建索引生成index属性定义的 XContentBuilder。  定义了一个类,属性对应的index 的属性,属性用自定义注解标识了 es字段类型(对应了上面的枚举),然后通过反射自动构建XContentBuilder用于 创建索引

    public static XContentBuilder initXContentBuilder(Map groupKey) throws IOException {
        XContentBuilder builder = XContentFactory.jsonBuilder();
        builder.startObject();
        {
            builder.startObject("properties");
            {
                Field[] fields = BusinessEsdocumentVO.class.getDeclaredFields();
                for (Field field : fields) {
                    if (field.isSynthetic()) {
                        //排除$jacocoData影响  本地没有这个属性,但是部署系统环境时候,maven集成了jacoco来统计单元测试的代码覆盖率。才会多一个 $jacocoData
                        continue;
                    }
                    EsField annotation = field.getAnnotation(EsField.class);
                    if (annotation == null) {
                        throw new BusinessException(String.format("属性%s必须要有注解@EsField", field.getName()));
                    }
                    builder.startObject(annotation.fieldName());
                    {
                        EsdocumentFieldTypeEnum esdocumentFieldTypeEnum = annotation.fieldType();
                        builder.field("type", esdocumentFieldTypeEnum.getValue());
                        if (StringUtils.isNotBlank(annotation.format())) {
                            builder.field("format", annotation.format());
                        }
                        //nested 类型字段属性设置
                        if (StringUtils.equals(annotation.fieldName(), GROUP_JSON.getFieldName())) {
                            builder.startObject("properties");
                            {
                                Set> entries = groupKey.entrySet();
                                for (Map.Entry entry : entries) {
                                    builder.startObject(entry.getKey());
                                    {
                                        EsdocumentFieldTypeEnum fieldType = EsdocumentFieldTypeEnum.getByFieldType(entry.getValue());
                                        builder.field("type", fieldType.getValue());
                                        if (fieldType == EsdocumentFieldTypeEnum.DATE) {
                                            builder.field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_second");
                                        }
                                    }
                                    builder.endObject();
                                }
                            }
                            builder.endObject();
                        }
                    }
                    builder.endObject();
                }
            }
            builder.endObject();
        }
        builder.endObject();
        return builder;
    }

分页查询(业务以基础字段查询 + nested 中的字段作为动态查询条件)以及 金额字段的求和,

涉及业务的代码不能出现在公网,所以简写如下

    
    private BoolQueryBuilder initBoolQueryBuilder(QuerySummaryResultRequest request) {

        BoolQueryBuilder boolQueryBuilder =  QueryBuilders.boolQuery();
        boolQueryBuilder.filter(QueryBuilders.matchPhraseQuery("属性名称1", "属性值")); //匹配查询 -不分词
        boolQueryBuilder.filter(QueryBuilders.rangeQuery("日期属性").gte("起始").lte("结束"));

        //nested 类型 下面的属性查询
        QueryBuilder queryBuilders = EsdocumentFieldTypeEnum.getQueryBuilders(fieldType, "nested属性名称".concat(".").concat("子属性名称"), "搜索值");
        boolQueryBuilder.filter(
                QueryBuilders.nestedQuery("nested属性名称", queryBuilders, ScoreMode.None));
        return boolQueryBuilder;
    }


    
    private List initAggBuilder() {
        List aggList = new ArrayList();
        SumAggregationBuilder summarySourceDataCount = AggregationBuilders.sum("求和返回后对应的code")
                .field("需要求和的属性名称");
        aggList.add(summarySourceDataCount);
        NestedAggregationBuilder nestedAggregationBuilder = AggregationBuilders
                .nested("求和返回后对应的code", "对应的父节点(nested类型)的code")
                .subAggregation(AggregationBuilders.sum("求和结果中对应的code").field("对应的父节点(nested类型)的code".concat(".").concat("待求和属性code")));
        aggList.add(nestedAggregationBuilder);
        return aggList;
    }

分页查询求和结果获取

//普通属性求和结果获取
ParsedSum dataCount = aggregations.get("构造AggregationBuilders时设置的code");
String value = DataTypeUtils.doubleToString(dataCount.getValue())

//Nested类型字段 下面的属性求和结果获取
ParsedNested parsedNested = aggregations.get("构造nestedAggregationBuilder时设置的code");
Aggregations sumFieldSumResult = parsedNested.getAggregations();
ParsedSum aggregation = sumFieldSumResult.get("构造subAggregation时指定的code");
String nestedValue = DataTypeUtils.doubleToString(aggregation.getValue());

值得注意的地方: 实践过程中,小数类型的数据求和后存在精度丢失的情况,通过查询资料后最终的处理方式是转换为整数存在(公司业务是金额字段,业务都是精确到分,保留2位小数,所以乘以100即可),存储到ES,需要求和的求和拿到结果再转换为小数

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/274876.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号