本人在进行数据迁移,主要是从mongo和mysql中的数据迁移到es中,测试了一下,效果还不错,故此记录一下,这篇主要是es的封装相关。
如果想看迁移相关代码,链接如下:从mongo迁移数据到es中
代码如下:
package com.jesse.metadata.utils; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import com.jesse.metadata.model.EsDataSync; import org.apache.commons.lang3.StringUtils; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.GetAliasesResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.client.indices.GetIndexRequest; import org.elasticsearch.cluster.metadata.AliasmetaData; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Timevalue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.FuzzyQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import lombok.extern.slf4j.Slf4j; @Component @Slf4j public class ElasticsearchTemplate{ @Autowired private RestHighLevelClient restHighLevelClient; public void createIndex(String indexName) throws IOException { // 1. 查询索引是否存在 hdgp_data_api_index GetIndexRequest getIndexRequest = new GetIndexRequest(indexName); boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT); if(!exists){ // 2. 执行创建请求 使用默认参数 请求之后获得响应 CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName); createIndexRequest.settings(Settings.builder() .put("index.number_of_shards", 5) .put("index.number_of_replicas", 1)); CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT); log.info("索引创建成功:{}",createIndexResponse.index()); } } public List getIndexListByAlias(String aliasName) throws IOException { List indexList = new ArrayList<>(); GetAliasesRequest getAliasesRequest = new GetAliasesRequest(); getAliasesRequest.aliases(aliasName); GetAliasesResponse getAliasesResponse = restHighLevelClient.indices() .getAlias(getAliasesRequest,RequestOptions.DEFAULT); Map > resultMap = getAliasesResponse.getAliases(); if(!CollectionUtils.isEmpty(resultMap)){ indexList = resultMap.keySet().stream().collect(Collectors.toList()); } return indexList; } public boolean renameAliases(String oldIndexName, String newIndexName, String aliasName) throws IOException { IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); if(StringUtils.isNotBlank(oldIndexName)){ indicesAliasesRequest .addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(oldIndexName).alias(aliasName)); } indicesAliasesRequest .addAliasAction(IndicesAliasesRequest.AliasActions.add().index(newIndexName).alias(aliasName)); AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices() .updateAliases(indicesAliasesRequest,RequestOptions.DEFAULT); return acknowledgedResponse.isAcknowledged(); } public boolean removeAliases(String indexName, String aliasName) throws IOException { IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.remove().index(indexName).alias(aliasName)); AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices() .updateAliases(indicesAliasesRequest,RequestOptions.DEFAULT); return acknowledgedResponse.isAcknowledged(); } public boolean addAliases(String indexName, String aliasName) throws IOException { IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest(); indicesAliasesRequest.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(indexName).alias(aliasName)); AcknowledgedResponse acknowledgedResponse = restHighLevelClient.indices() .updateAliases(indicesAliasesRequest,RequestOptions.DEFAULT); return acknowledgedResponse.isAcknowledged(); } public void deleteIndex(String indexName) throws IOException { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); AcknowledgedResponse deleteAcknowledgedResponse = restHighLevelClient.indices().delete(deleteIndexRequest, RequestOptions.DEFAULT); log.info((deleteAcknowledgedResponse.isAcknowledged() == true) ? "该索引删除成功!" : "该索引删除失败!"); } public void createdocument(String indexName, EsDataSync esDataSync) throws IOException { //插入的文档对象 IndexRequest indexRequest = new IndexRequest(indexName); // 创建HTTP请求 PUT hdgp_data_api_index/_doc/1 是Restful的写法 indexRequest.id(esDataSync.getEsId()); indexRequest.timeout(Timevalue.timevalueSeconds(1)); //装入JSON对象数据 indexRequest.source(JsonUtil.toJsonStr(esDataSync.getData()), XContentType.JSON); //发送请求 并拿到结果 IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("response status:{},response result:{}" ,indexResponse.status(),indexResponse.toString()); } public String querydocumentById(String indexName, String documentId) throws IOException { GetRequest getRequest = new GetRequest(indexName, documentId); GetResponse getResponse = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); String document = getResponse.getSourceAsString(); log.info("response document:{}" ,document); return document; } public void updatedocumentById(String indexName,EsDataSync esDataSync) throws IOException { UpdateRequest updateRequest = new UpdateRequest(indexName, esDataSync.getEsId()); updateRequest.timeout(Timevalue.timevalueSeconds(1)); updateRequest.doc(JsonUtil.toJsonStr(esDataSync.getData()), XContentType.JSON); UpdateResponse updateResponse = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("status:{}", updateResponse.status()); } public void deletedocumentById(String indexName,String documentId) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(indexName, documentId); DeleteResponse deleteResponse = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); log.info("status:{}",deleteResponse.status()); } public BulkResponse addBatchdocument(String indexName, List > list) throws IOException { log.info("批量插入,索引:{},数据:{}", indexName, list); if(CollectionUtils.isEmpty(list)){ return null; } // 创建索引 createIndex(indexName); BulkRequest bulkRequest = new BulkRequest(); list.forEach(esDataSync ->{ bulkRequest.add(new IndexRequest(indexName) .id(esDataSync.getEsId()) .source(JsonUtil.toJsonStr(esDataSync.getData()),XContentType.JSON)); }); BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT); log.info("批量插入状态:{}", bulkResponse.status()); return bulkResponse; } public void searchTest() throws IOException { //1. 新建一个请求 确定查询哪一个索引 SearchRequest searchRequest = new SearchRequest("hdgp_*"); //2. 构建查询体对象 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); //3. 构建精准查询 FuzzyQueryBuilder termQueryBuilder = QueryBuilders.fuzzyQuery("userName", "峰"); //查询对象使用这个精确查询 searchSourceBuilder.query(termQueryBuilder); searchSourceBuilder.timeout(Timevalue.timevalueSeconds(60)); //这个请求使用这个查询体 searchRequest.source(searchSourceBuilder); //通过请求发出 返回的响应中 去取出对应的数据对象 SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT); //对应的数据对象是存储在数组中的 循环遍历即可 for (SearchHit documentFields : searchResponse.getHits().getHits()) { System.out.println(documentFields.getSourceAsMap()); } } }
EsDataSync类:
package com.jesse.task.domain; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; import java.io.Serializable; @Data @NoArgsConstructor @ToString public class EsDataSyncimplements Serializable{ private String esId; private TEntity data; private String indexName; public EsDataSync(String esId, TEntity data) { this.esId = esId; this.data = data; } }
JsonUtil类:
package com.jesse.metadata.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
public class JsonUtil {
private static final ObjectMapper objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS)
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
public static String toJsonStr(Object obj) {
try {
return objectMapper.writevalueAsString(obj);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static T readJson(String str, Class clazz) {
try {
return objectMapper.readValue(str, clazz);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}



