- 简介
- 第一步-获取更新
- 第二步-同步es
当在对mysql表进行insert,update时,同时使用es api对es数据库进行同步修改,保证mysql和es数据的一致性。
第一步-获取更新新增/修改数据时将相关信息推送到kafka,具体可以看EsModel的字段
@Data
@Accessors(chain = true)
public class EsModel {
//mysql表名
String table;
//mysql主键id,与es的id对应
Long id;
//mysql主键id列表,与es的id对应
List ids;
//es索引
String esIndex;
}
然后发送消息
private void sendKafka(String topic, String message) {
ListenableFuture> future = kafkaTemplate.send(topic, message);
future.addCallback(m -> log.info("kafka_send_success:{}", m), e -> log.error("kafka_send_error:{}", e.getMessage(), e));
}
第二步-同步es
首先kafka消费者根据id和表名去mysql数据库取出修改或新增后的数据,为了可以从不同表取出数据:
首先要传递表名,使用${table}而不是#{table},因为后者会被当成字符串处理;
使用user这样的单个model接收数据肯定是不行的,这里使用了hashmap作为数据存储格式。
List
调用mapper获取mysql数据之后,然后调用es客户端向es发送请求,完成同步。
这里使用的Indexrequest,它可以兼容插入和更新两种操作,当es索引中没有传入id对应文档时候会插入,反之就会update。
@KafkaListener(topics = "${spring.kafka.topic.elasticsearch}")
@Transactional(rollbackFor = Exception.class)
public void esManageListener(String message, Acknowledgment ack) {
log.info("kafka_received: message={}", message);
try{
EsModel esModel = JSON.parseObject(message, EsModel.class);
List


