栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

mysql与es实时同步之数据库双写

mysql与es实时同步之数据库双写

mysql与es实时同步之数据库双写
  • 简介
  • 第一步-获取更新
  • 第二步-同步es

简介

当在对mysql表进行insert,update时,同时使用es api对es数据库进行同步修改,保证mysql和es数据的一致性。

第一步-获取更新

新增/修改数据时将相关信息推送到kafka,具体可以看EsModel的字段

@Data
@Accessors(chain = true)
public class EsModel {

    //mysql表名
    String table;

    //mysql主键id,与es的id对应
    Long id;

    //mysql主键id列表,与es的id对应
    List ids;

    //es索引
    String esIndex;

}

然后发送消息

    private void sendKafka(String topic, String message) {
        ListenableFuture> future = kafkaTemplate.send(topic, message);
        future.addCallback(m -> log.info("kafka_send_success:{}", m), e -> log.error("kafka_send_error:{}", e.getMessage(), e));
    }
第二步-同步es

首先kafka消费者根据id和表名去mysql数据库取出修改或新增后的数据,为了可以从不同表取出数据:
首先要传递表名,使用${table}而不是#{table},因为后者会被当成字符串处理;
使用user这样的单个model接收数据肯定是不行的,这里使用了hashmap作为数据存储格式。

List> selectData(@Param("table") String table, @Param("ids") List ids);



application.yml配置文件也需修改
避免部分字段为null时不返回数据 
mybatis:
  configuration:
    call-setters-on-nulls: true

调用mapper获取mysql数据之后,然后调用es客户端向es发送请求,完成同步。
这里使用的Indexrequest,它可以兼容插入和更新两种操作,当es索引中没有传入id对应文档时候会插入,反之就会update。

    @KafkaListener(topics = "${spring.kafka.topic.elasticsearch}")
    @Transactional(rollbackFor = Exception.class)
    public void esManageListener(String message, Acknowledgment ack) {
        log.info("kafka_received: message={}", message);
        try{
            EsModel esModel = JSON.parseObject(message, EsModel.class);
            List> mysqlData = esMapper.selectData(esModel.getTable(), esModel.getIds());
            BulkRequest bulkRequest = new BulkRequest();
            for (Map data : mysqlData){
                String dataJson = JSONObject.toJSONString(data, SerializerFeature.WriteMapNullValue);
                IndexRequest indexRequest = new IndexRequest()
                        .index(esModel.getEsIndex())
                        .id(String.valueOf(data.get("id")))
                        .type("_doc")
                        .source(dataJson, XContentType.JSON);
                bulkRequest.add(indexRequest);
            }
            BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
        }catch (Exception e){
            e.printStackTrace();
            // TODO: 2021/9/7  log
        }finally{
            ack.acknowledge();
        }
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350312.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号