背景:公司原有报表数据直接查询的mysql,同时部分业务数据以JSON格式存储于mysql字段中, 而随着业务的发展有以下瓶颈出现
1、需要JSON中的字段也能作为查询条件,虽然msql支持JSON查询,但是mybatis 貌似不支持并且性能有待考证
2、随着业务的发展,单表数据达到千万级别,而报表查询的查询条件跨度较大,现有报表查询进程出现超时的情况
为了解决以上问题,决定把mysql的业务数据迁移到ES集群,为了数据查看做了数据的平移,mysql存储为JSON格式的数据,迁移的ES后数据类型定义为nested
事件如下:
添加maven依赖(7.5.1)
org.elasticsearch elasticsearch${es.version} org.elasticsearch.client elasticsearch-rest-high-level-client${es.version} org.elasticsearch.client elasticsearch-rest-client-sniffer${es.version}
es客户端初始化配置
@Slf4j
@Getter
@Configuration
public class EsConfiguration {
private final static int SNIFF_INTERVAL_MILLIS = 60000;
private final static int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 10000;
@value("es.es7.host")
private String serverHost;
@value("es.es7.http.port")
private Integer serverPort;
@value("es.es7.username")
private String userName;
@value("es.es7.password")
private String password;
@Bean(destroyMethod = "close")
public RestHighLevelClient restHighLevelClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
ArrayList httpHosts = new ArrayList<>();
String[] hosts = serverHost.split(",");
for (String host : hosts) {
int index = host.indexOf(":");
int port = serverPort;
String address = host;
if (index != -1) {
port = Integer.parseInt(host.substring(index + 1));
address = host.substring(0, index);
}
httpHosts.add(new HttpHost(address, port));
}
SniffonFailureListener sniffonFailureListener = new SniffonFailureListener();
HttpHost[] hostsArray = httpHosts.toArray(new HttpHost[httpHosts.size()]);
RestClientBuilder restClientBuilder = RestClient.builder(hostsArray)
.setFailureListener(sniffOnFailureListener)
.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
SnifferBuilder snifferBuilder = Sniffer.builder(restHighLevelClient.getLowLevelClient())
.setSniffIntervalMillis(SNIFF_INTERVAL_MILLIS).setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS);
sniffOnFailureListener.setSniffer(snifferBuilder.build());
return restHighLevelClient;
}
es操作类(官方API文档地址:Java REST Client [7.15] | Elastic)
@Service
@Slf4j
public class EsOperationServiceImpl implements EsOperationService {
@Autowired
RestHighLevelClient restHighLevelClient;
public boolean existIndex(List indices) {
try {
return restHighLevelClient.indices().exists(new GetIndexRequest(indices.toArray(new String[indices.size()])), RequestOptions.DEFAULT);
} catch (IOException e) {
log.info("EsOperationServiceImpl.existIndex {} error :", JSONObject.toJSonString(indices), e);
throw new BusinessException("restClient.existIndex error :", e);
}
}
public boolean createIndex(String index, XContentBuilder builder, Integer shards, Integer replicas) {
boolean success = false;
try {
CreateIndexRequest request = new CreateIndexRequest(index);
if (builder != null) {
request.mapping(builder);
}
request.settings(Settings.builder()
//分区数量
.put("index.number_of_shards", shards)
//副本数量
.put("index.number_of_replicas", replicas)
.put("index.max_result_window", Integer.MAX_VALUE)
);
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
success = createIndexResponse.isAcknowledged() && createIndexResponse.isShardsAcknowledged();
return success;
} catch (Exception e) {
log.error("EsOperationServiceImpl.createIndex {} error :", index, e);
throw new BusinessException("restClient.createIndex error :", e);
} finally {
log.info("EsOperationServiceImpl.createIndex {} 成功 ? {}", index, success);
}
}
@Override
public boolean putSettings(String index, Integer shards, Integer replicas) {
UpdateSettingsRequest request = new UpdateSettingsRequest(index);
request.settings(Settings.builder()
//分区数量
.put("index.number_of_shards", shards)
//副本数量
.put("index.number_of_replicas", replicas));
AcknowledgedResponse updateSettingsResponse;
try {
updateSettingsResponse = restHighLevelClient.indices().putSettings(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.info("EsOperationServiceImpl.putSettings {} error :", index, e);
throw new BusinessException("EsOperationServiceImpl.putSettings error :", e);
}
log.info("EsOperationServiceImpl.putSettings 修改索引为 分区{} 副本{} 结果{}", shards, replicas, updateSettingsResponse.isAcknowledged());
return updateSettingsResponse.isAcknowledged();
}
@Override
public boolean deleteIndex(String index, boolean isSync) throws IOException {
try {
DeleteIndexRequest request = new DeleteIndexRequest(index);
if (isSync) {
restHighLevelClient.indices().deleteAsync(request, RequestOptions.DEFAULT,
new ActionListener() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
log.info("EsOperationServiceImpl.deleteIndex {} sync is success ? {} :", index, acknowledgedResponse.isAcknowledged());
return;
}
@Override
public void onFailure(Exception e) {
log.info("EsOperationServiceImpl.deleteIndex {} sync throw exception :", index, e);
return;
}
});
return true;
}
request.timeout(Timevalue.timevalueMinutes(5));
AcknowledgedResponse deleteIndexResponse = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
return deleteIndexResponse.isAcknowledged();
} catch (ElasticsearchException e) {
log.error("EsOperationService.deleteIndex {} throw exception :", index, e);
if (e.status() == RestStatus.NOT_FOUND) {
return true;
}
return false;
}
}
public BulkResponse batchInsert(String index, Map sourceMap) {
//创建BulkRequest
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout(Timevalue.timevalueMinutes(2));
//便利id的list,插入到BulkRequest中。
for (Map.Entry entry : sourceMap.entrySet()) {
IndexRequest indexRequest = new IndexRequest(index).id(entry.getKey()).source(entry.getValue(), XContentType.JSON);
bulkRequest.add(indexRequest);
}
try {
//执行插入请求操作
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
StringBuffer failedMsg = new StringBuffer();
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
failedMsg.append(bulkItemResponse.getFailure().getMessage());
}
if (failedMsg.length() > 20000) {
//太大了没必要。 占内存
break;
}
}
log.error("EsOperationService.batchInsert {} has errorMsg {} :", index, failedMsg.toString());
throw new BusinessException(String.format("索引%s批量插入数据异常", index));
}
log.info("EsOperationService.batchInsert {} {} 条 success", index, sourceMap.size());
return bulkResponse;
} catch (IOException e) {
log.error("EsOperationService.batchInsert {} throw exception :", index, e);
throw new BusinessException("restClient.batchInsert.error :", e);
}
}
public String save(String index, String id, String source) {
try {
IndexResponse indexResponse = restHighLevelClient.index(new IndexRequest(index).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.id(id).source(source, XContentType.JSON), RequestOptions.DEFAULT);
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getFailed() > 0) {
StringBuffer failedMsg = new StringBuffer();
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
failedMsg.append(failure.reason());
if (failedMsg.length() > 20000) {
break;
}
}
throw new BusinessException(String.format("索引%s保存数据异常%s", index, failedMsg.toString()));
}
return indexResponse.getId();
} catch (Exception e) {
log.error("EsOperationService.index {} save throw exception :", index, e);
throw new BusinessException("es保存数据异常", e);
} finally {
log.info("EsOperationService.index done, index={}, id={}", new Object[]{index, id});
}
}
public boolean deleteById(String index, String id) throws IOException {
DeleteRequest request =
new DeleteRequest(index);
request.id(id);
DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
DocWriteResponse.Result result = deleteResponse.getResult();
log.info("EsOperationServiceImpl.deleteById index {} id {} result {}", index, id, result.name());
return result == DocWriteResponse.Result.DELETED || result == DocWriteResponse.Result.NOT_FOUND;
}
public EsQueryPageResponseVO
因为我们业务中所有字段都要求可查询,而新增文档时es默认的数据类型可能不满足要求,所以我们是先创建索引再新增数据的,动态创建索引实践如下
ES数据类型枚举定义(实现了业务的数据类型与ES数据类型的转换关系,以及查询时的策略)
@Getter
@NoArgsConstructor
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public enum EsdocumentFieldTypeEnum {
TEXT("text"),
KEYWORd("keyword"),
LONG("long"),
INTEGER("integer"),
DOUBLE("double"),
FLOAT("float"),
BOOLEAN("boolean"),
DATE("date"),
OBJECT("object"),
NESTED("nested"),
;
private String value;
public static EsdocumentFieldTypeEnum getByFieldType(FieldType fieldType) {
if (fieldType == FieldType.NUMBER) {
return LONG;
}
if (fieldType == FieldType.FLOAT) {
return DOUBLE;
}
if (fieldType == FieldType.CHAR) {
return KEYWORD;
}
if (fieldType == FieldType.DATE || fieldType == FieldType.TIME) {
return DATE;
}
throw new BusinessException(String.format("新增的字段业务类型%s,请增加映射逻辑", fieldType.getCode()));
}
public static QueryBuilder getQueryBuilders(FieldType fieldType, String fieldCode, String value) {
EsdocumentFieldTypeEnum esdocumentFieldTypeEnum = getByFieldType(fieldType);
value = value.trim();
if (esdocumentFieldTypeEnum == TEXT || esdocumentFieldTypeEnum == KEYWORD) {
//文本类型走模糊匹配
if (value.startsWith("*")) {
String queryValue = value.endsWith("*") ? value : value.concat("*");
return QueryBuilders.wildcardQuery(fieldCode, queryValue);
}
return QueryBuilders.prefixQuery(fieldCode, value);
}
return QueryBuilders.matchQuery(fieldCode, value);
}
}
创建索引生成index属性定义的 XContentBuilder。 定义了一个类,属性对应的index 的属性,属性用自定义注解标识了 es字段类型(对应了上面的枚举),然后通过反射自动构建XContentBuilder用于 创建索引
public static XContentBuilder initXContentBuilder(MapgroupKey) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.startObject("properties"); { Field[] fields = BusinessEsdocumentVO.class.getDeclaredFields(); for (Field field : fields) { if (field.isSynthetic()) { //排除$jacocoData影响 本地没有这个属性,但是部署系统环境时候,maven集成了jacoco来统计单元测试的代码覆盖率。才会多一个 $jacocoData continue; } EsField annotation = field.getAnnotation(EsField.class); if (annotation == null) { throw new BusinessException(String.format("属性%s必须要有注解@EsField", field.getName())); } builder.startObject(annotation.fieldName()); { EsdocumentFieldTypeEnum esdocumentFieldTypeEnum = annotation.fieldType(); builder.field("type", esdocumentFieldTypeEnum.getValue()); if (StringUtils.isNotBlank(annotation.format())) { builder.field("format", annotation.format()); } //nested 类型字段属性设置 if (StringUtils.equals(annotation.fieldName(), GROUP_JSON.getFieldName())) { builder.startObject("properties"); { Set > entries = groupKey.entrySet(); for (Map.Entry entry : entries) { builder.startObject(entry.getKey()); { EsdocumentFieldTypeEnum fieldType = EsdocumentFieldTypeEnum.getByFieldType(entry.getValue()); builder.field("type", fieldType.getValue()); if (fieldType == EsdocumentFieldTypeEnum.DATE) { builder.field("format", "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_second"); } } builder.endObject(); } } builder.endObject(); } } builder.endObject(); } } builder.endObject(); } builder.endObject(); return builder; }
分页查询(业务以基础字段查询 + nested 中的字段作为动态查询条件)以及 金额字段的求和,
涉及业务的代码不能出现在公网,所以简写如下
private BoolQueryBuilder initBoolQueryBuilder(QuerySummaryResultRequest request) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.filter(QueryBuilders.matchPhraseQuery("属性名称1", "属性值")); //匹配查询 -不分词
boolQueryBuilder.filter(QueryBuilders.rangeQuery("日期属性").gte("起始").lte("结束"));
//nested 类型 下面的属性查询
QueryBuilder queryBuilders = EsdocumentFieldTypeEnum.getQueryBuilders(fieldType, "nested属性名称".concat(".").concat("子属性名称"), "搜索值");
boolQueryBuilder.filter(
QueryBuilders.nestedQuery("nested属性名称", queryBuilders, ScoreMode.None));
return boolQueryBuilder;
}
private List initAggBuilder() {
List aggList = new ArrayList();
SumAggregationBuilder summarySourceDataCount = AggregationBuilders.sum("求和返回后对应的code")
.field("需要求和的属性名称");
aggList.add(summarySourceDataCount);
NestedAggregationBuilder nestedAggregationBuilder = AggregationBuilders
.nested("求和返回后对应的code", "对应的父节点(nested类型)的code")
.subAggregation(AggregationBuilders.sum("求和结果中对应的code").field("对应的父节点(nested类型)的code".concat(".").concat("待求和属性code")));
aggList.add(nestedAggregationBuilder);
return aggList;
}
分页查询求和结果获取
//普通属性求和结果获取
ParsedSum dataCount = aggregations.get("构造AggregationBuilders时设置的code");
String value = DataTypeUtils.doubleToString(dataCount.getValue())
//Nested类型字段 下面的属性求和结果获取
ParsedNested parsedNested = aggregations.get("构造nestedAggregationBuilder时设置的code");
Aggregations sumFieldSumResult = parsedNested.getAggregations();
ParsedSum aggregation = sumFieldSumResult.get("构造subAggregation时指定的code");
String nestedValue = DataTypeUtils.doubleToString(aggregation.getValue());
值得注意的地方: 实践过程中,小数类型的数据求和后存在精度丢失的情况,通过查询资料后最终的处理方式是转换为整数存在(公司业务是金额字段,业务都是精确到分,保留2位小数,所以乘以100即可),存储到ES,需要求和的求和拿到结果再转换为小数



