- 配置类不用改,需要改的地方就是业务需求的地方
- 每两分钟同步一次,加了个定时任务
- 自已已应用到测试环境,是能正常运行的
package com.example.gauditdemo.config;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ESConfigClient {
public final Logger logger = LoggerFactory.getLogger(this.getClass());
@Bean
public RestHighLevelClient esClient(){
return new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http")));
}
}
package com.example.gauditdemo.utils;
import com.example.gauditdemo.config.ESConfigClient;
import com.example.gauditdemo.dao.OperationDao;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizevalue;
import org.elasticsearch.common.unit.Timevalue;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
@Component
public class ElasticSearchUtil {
public final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ESConfigClient esConfigClient;
private BulkProcessor bulkProcessor;
@Resource
private OperationDao operationDao;
@Value("${audit.index.prefix.env}")
private String auditIndexPrefixEnv;
@PostConstruct
public void init() {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
//重写beforeBulk,在每次bulk request发出前执行,在这个方法里面可以知道在本次批量操作中有多少操作数
int numberOfActions = request.numberOfActions();
logger.info("同步数量 Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
//重写afterBulk方法,每次批量请求结束后执行,可以在这里知道是否有错误发生。
if (response.hasFailures()) {
logger.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
} else {
logger.info("写入成功 Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
//重写方法,如果发生错误就会调用。
logger.error("写入失败 Failed to execute bulk", failure);
}
};
//在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作
this.bulkProcessor = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
esConfigClient.esClient().bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
}), listener)
// 1000条数据请求执行一次bulk
.setBulkActions(1000)
// 3mb的数据刷新一次bulk
.setBulkSize(new ByteSizevalue(3L, ByteSizeUnit.MB))
// 并发请求数量, 0不并发, 1并发允许执行
.setConcurrentRequests(1)
// 固定1s必须刷新一次
.setFlushInterval(Timevalue.timevalueSeconds(5L))
// 重试5次,间隔100s
.setBackoffPolicy(BackoffPolicy.constantBackoff(Timevalue.timevalueSeconds(100L), 5))
.build();
}
@PreDestroy
public void destroy() {
try {
bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.error("Failed to close bulkProcessor", e);
}
logger.info("bulkProcessor closed!");
}
@Scheduled(cron = "0 */2 * * * ?")
public void insertIntoEs() {
long startTime = System.currentTimeMillis();
List
二、自己过程中遇到的问题及解决办法
- 报错 bulk has been already closed 原因及解决办法:因为在定时任务那里,bulkProcessor添加完到es中,我就将es关闭掉了,导致下一次定时任务到的时候,就会报这个错。解决办法:就是将 client.close() 删除掉就可以了
- mysql时间类型与es中时间类型不一致 原因及解决办法:mysql中有一个时间date类型,同步到es中,这个时间类型在es中是text类型,导致查询会报错,预想应该在es中也是date类型才对。解决办法:mybatis中查询出来的结果将时间进行转换,代码中有,我写了注释。最好将es中的索引先删除掉,然后es会自动创建索引和字段类型的。
- es中时间比mysql中查询出来的时间少了8个小时 原因及解决办法:同步的时候发现es中时间少8小时。解决办法:mybatis中查询出来的结果将时间加8个小时,代码中有,可以参考一下,我写了注释。
- 定时任务起效果,需要三个条件 原因及解决办法:自己搜索0.0
- 测试的时候,我是在windwos中安装的es和kibana 原因及解决办法:比较方便,方便我调式
- 为什么同步es中要用bulkProcessor 原因及解决办法:当你数据量特别大的时候,不用bulkProcessor,如果一次性同步几百万条数据,会将es弄崩掉的。解决办法:加上bulkProcessor,可以1000条同步一次,或者几s自动同步一次,也比较方便
- mybatis中查询使用流查询,添加 fetchSize="1000" 原因及解决办法:mybatis中一次性从mysql中查询几万条数据,会造成数据库压力。解决办法:加上fetchSize,流查询,缓解数据库压力。
- 想不到还有什么坑,想到了我再添加,有不清楚的可以下面评论,看到我会解答的。
定时同步mysql到es的demo项目



