栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

Canal通过Kafka实现MySQL与Redis同步

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Canal通过Kafka实现MySQL与Redis同步

文章目录
  • Canal通过Kafka实现MySQL与Redis同步
    • Zookeeper 安装
    • Kafka 安装
    • 修改 Canal 文件配置
    • 编写实体类和 Kafka 消费者
    • 验证


Canal通过Kafka实现MySQL与Redis同步

Docker 环境安装、MySQL 安装、Redis 安装、Canal 安装、MySQL文件配置和 Canal 文件配置请移步 Canal通过TCP实现MySQL与Redis同步 查看。

Zookeeper 安装
  • 下载 Zookeeper3.7.0 的 docker 镜像:
docker pull zookeeper:3.7.0
  • 使用如下命令启动 Zookeeper 服务:
docker run --name zookeeper -p 2181:2181 --restart always -d zookeeper:3.7.0

–restart always 的设置可以使 docker 启动时同时启动 Zookeeper。

Kafka 安装
  • 下载 kafka2.13-2.8.1 的 docker 镜像:
docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 
-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.4:2181 -e 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.4:9092 -e 
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka:2.13-2.8.1
  • 进入 kafka 容器:
docker exec -it kafka /bin/sh
  • 进入 Kafka 安装目录下:
cd opt/kafka_2.13-2.8.1
  • 创建 topic
kafka-topics.bat --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic mall
修改 Canal 文件配置
  • 进入 Canal 容器:
docker exec -it canal /bin/bash
  • 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/instance.properties


  • 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/example/instance.properties

编写实体类和 Kafka 消费者
  • 引入 Kafka 依赖:

    org.springframework.kafka
    spring-kafka

  • 在 application.yml 文件增加 Kafka 配置:
kafka:
    bootstrap-servers: 10.0.0.4:9092
    consumer:
      group-id: mall-master
  • 创建 CanalBean 对象接收 Kafka 消息:
package com.macro.mall.canal;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;


@Data
@AllArgsConstructor
@NoArgsConstructor
public class CanalBean {

    
    private List data;

    
    private String database;

    private long es;

    
    private int id;

    
    private boolean isDdl;

    
    private MysqlType mysqlType;

    
    private String old;

    
    private List pkNames;

    
    private String sql;

    private SqlType sqlType;

    
    private String table;

    private long ts;

    
    private String type;
}
package com.macro.mall.canal;



public class MysqlType {
    private String id;
    private String username;
    private String password;
    private String icon;
    private String email;
    private String nickName;
    private String note;
    private String createTime;
    private String loginTime;
    private String status;
}
package com.macro.mall.canal;


public class SqlType {
    private int id;
    private int username;
    private int password;
    private int icon;
    private int email;
    private int nickName;
    private int note;
    private int createTime;
    private int loginTime;
    private int status;
}
  • 创建 Kafka 消费者:
package com.macro.mall.canal;


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;


@Component
@Slf4j
public class CanalConsumer {

    @Autowired
    private RedisTemplate redisTemplate;

    private static String REDIS_DATABASE = "mall";
    private static String REDIS_KEY_ADMIN = "ums:admin";

    private static String insert = "INSERT";
    private static String update = "UPDATE";


    @KafkaListener(topics = "mall")
    public void receive(ConsumerRecord consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{}, key:{}, 分区位置:{}, 下标:{}, value:{}", consumer.topic(), consumer.key(),
                consumer.partition(), consumer.offset(), value);

        // 转换为javaBean
        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);

        // 获取类型
        String type = canalBean.getType();

        // 获取是否是DDL语句
        boolean isDdl = canalBean.isDdl();

        // 不是DDL语句
        if (!isDdl) {
            List UmsAdmins = canalBean.getData();

            if (insert.equals(type) || update.equals(type)) {
                //新增或更新语句
                for (UmsAdmin umsAdmin : UmsAdmins) {
                    //新增到redis中
                    redisTemplate.opsForValue().set(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername(),
                            JSONObject.toJSONString(umsAdmin));
                }
            } else {
                // 删除语句
                for (UmsAdmin umsAdmin : UmsAdmins) {
                    //从redis中删除
                    redisTemplate.delete(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername());
                }
            }
        }
    }
}
验证
  • 修改前

MySQL

Redis

  • 修改后

日志

MySQL

Redis

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/882540.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号