栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Canal CDC

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Canal CDC

文章目录

1.Canal介绍

1.1 Mysql 的binlog介绍1.2 Canal 的运行原理1.3 Canal使用场景 2.Mysql 的配置准备3.Canal 的准备4.Canal 数据结构分析5.Java 代码6.Kafka 测试

1.Canal介绍

阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

1.1 Mysql 的binlog介绍

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。

binlog可分为STATEMENT, MIXED, ROW

statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

row:行级, binlog 会记录每次操作后每行记录的变化。

mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理

Canal 想做监控分析,选择 row 格式比较合适。

1.2 Canal 的运行原理


Mysql的主从复制:

    Master改变数据, 写入到二进制文件中slave 从master 发送dump协议, 读取二进制文件到自己的relay logslave读取relay log到自己的数据库

canal就是将自己伪装为slave

1.3 Canal使用场景

1> 进行异地数据库之间的同步框架


2> 更新缓存, 实现缓存和数据库的一致性

3> 抓取业务表的新增变化数据,用于制作实时统计(我们就是这种场景)

2.Mysql 的配置准备
CREATE TABLE user_info(
`id` VARCHAR(255),
`name` VARCHAR(255),
`sex` VARCHAR(255)
);
vim /etc/my.cnf

systemctl restart mysqld
3.Canal 的准备


修改配置



防火墙关闭

4.Canal 数据结构分析


发送的是Message, 由很多Entry组成, 一个Entry对应一个Sql命令

Entry: TableName, EntryTyple, Storevalue, RowChange

RowChange为反序列化后的数据, 如果要使用的话必须通过Storevalue反序列化为RowChange后才可以使用

5.Java 代码

maven

       
            com.alibaba.otter
            canal.client
            1.0.25
        
public class Test {
    public static void main(String[] args)throws Exception {
        // Canal中的数据结构: Message - Entry(对应一个Sql) - TableName, EntryType, Storevalue-RowChange
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.66.66", 11111), "example", "", "");

        while (true){
            // 连接
            connector.connect();
            // 订阅
            connector.subscribe("cdc_test.*");
            // 获取数据
            Message message = connector.get(100);

            // 获取Entry 集合
            List entries = message.getEntries();

            if (entries.size() <= 0){
                System.out.println("稍等一会.........");
                Thread.sleep(1000);
            }else {
                for (CanalEntry.Entry entry : entries) {
                    // 1.获取表名
                    String tableName = entry.getHeader().getTableName();
                    // 2.获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    // 3.获取序列化后的数据
                    ByteString storevalue = entry.getStorevalue();
                    // 判断当前的类型是否为Row
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
                        // 5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue);
                        // 6.获取EventType
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 7.获取数据集
                        List rowDatasList = rowChange.getRowDatasList();
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            JSONObject beforeJson = new JSONObject();
                            List beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeJson.put(column.getName(), column.getValue());
                            }

                            JSONObject afterJson = new JSONObject();
                            List afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterJson.put(column.getName(), column.getValue());
                            }
                            System.out.println("Table:" + tableName + "eventType: " + eventType + "before: " + beforeJson + "after: " + afterJson);


                        }
                    }else {
                        System.out.println("当前数据类型为:" + entryType);
                    }

                }
            }
        }
    }
}

6.Kafka 测试

修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到 kafka


修改 Kafka 集群的地址

canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092

修改 instance.properties 输出到 Kafka 的主题以及分区数

# mq config
canal.mq.topic=canal_test
canal.mq.partitionsNum=1
# hash partition config
#canal.mq.partition=0
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic canal_test

向数据库添加数据后

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781136.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号