- 1. 概述
- 2. Canal的组成
- 3. Server解析
- 3.1 Instance组件
- 3.1.1 metaManager
- 3.1.2 EventStore
- 3.1.3 EventSink
- 3.1.4 EventParser
- 3.1.5 Instance总结
- 3.2 CanalServerWithEmbedded 组件
- 3.3 CanalServerWithNetty 组件
- 4. Client解析
- 5. 结束语
Canal是阿里巴巴开发的一项开源组件,通过将自己伪装成MySQL Slave节点来接收Master节点的Binlog日志,然后就可以实现我们的需求,例如:同步到MySQL从库、同步到Elasticsearch、同步到Kafka 等等。
Canal主要分成 Server(服务端)和 Client(客户端)
- Server 主要由 Instance 构成,1个Server可以有多个Instance,每个Instance由包括 EventParser、EventSink、EventStore、metaManager 四个部分组成,如下图所示:
EventParser:连接MySQL,充当 Slave 和 Master 进行交互,并且实现协议解析
EventSink:EventParser 和 EventStore 的连接器,可以对数据进行一定的处理
EventStore:数据存储,内部通过 Event[] 数组记录,因此数据是存在内存
metaManager:维护 conf/Instance名称/meta.dat 文件,记录了Client消费的进度(binlog文件名、position等等)
- Client 称为客户端,官方提供的Client Demo是一段Java程序,使用NIO连接 Server 获取Binlog进行处理。此外,阿里也开发了几个现成的Adapter(适配器),通过配置就能实现数据同步到Log、ES、Kafka等等,不需要编写任何的Client代码。
元数据管理器,主要是对Client、Position信息进行记录。这里使用的是 FileMixedmetaManager 文件型管理器,它继承了 MemorymetaManager 内存型管理器,它们的关系如下:
启动的时候,会优先调用 MemorymetaManager.start() 方法,初始化Map对象,主要用于记录每个 Client 与 Position 的对应关系。
public class MemorymetaManager extends AbstractCanalLifeCycle implements CanalmetaManager {
protected Map> destinations;
protected Map batches;
protected Map cursors;
public void start() {
super.start();
batches = MigrateMap.makeComputingMap(MemoryClientIdentityBatch::create);
cursors = new MapMaker().makeMap();
destinations = MigrateMap.makeComputingMap(destination -> Lists.newArrayList());
}
}
然后,才是调用 FileMixedmetaManager 自己的启动逻辑:
- 判断 …/conf 目录是否存在,不存在就创建
- 判断 …/conf 目录下是否存在当前 Instance 的目录,不存在就创建
- 创建频率为1秒的定时任务,定时把内存的meta数据刷新到 meta.dat 文件
这里使用的类型为 MemoryEventStoreWithBuffer,其内部维护名称为 entries 的 Event 数组对象,默认大小为 16384。
EventStore 有 3 种主要操作:Put、Get、Ack
- Put:添加数据,通过 putSequence 标记最后一次写操作的位置,默认 -1
- Get:读取数据,由Client发起,通过 getSequence 标记当前读取到的位置,默认 -1
- Ack:确认读取,由Client发起,在Get之后调用,表示Get的数据已经消费完毕,此操作会删除数据。通过 ackSequence 标记最后 Ack 的位置,默认 -1
举个例子说明他们之间的关系:
假设初始状态下,entries 数组长度等于10
执行 put 10 笔数据,此时 putSequence = 9
执行 get 8 笔数据,此时 getSequence = 7
执行 act 5 笔数据,此时 ackSequence = 4
特殊情况一:entries满了之后,继续 put 2 笔数据,会发生什么?
EventStore 计算下标的逻辑是调用 getIndex() 方法,当下标超过最大值,会从新开始计算。也就是说,继续 put 2 笔数据会放在下标 0 - 1
特殊情况二:以上面最后一个截图为例,继续 put 8 笔数据,会发生什么?
- Put 操作之前,EventStore 会先调用 checkFreeSlotAt 进行检查,因为put 8 笔数据会放在 0 - 7,已经超过 ack 的进度(4),此时会返回 false 给 EventSink,并且不会继续执行 put 操作。
- EventSink 收到 false 结果会进入阻塞状态,直到 put 成功为止。
这里使用的类型为 EntryEventSink,内部的核心方法是 sinkData() ,作用是对Event进行过滤筛选,然后调用 eventStore.tryPut(),把事件传递给EventStore进行存储。
3.1.4 EventParserEventParser 是独立一个线程在持续的运行,它的启动代码在 AbstractEventParser.start(),主要做以下工作:
-
计算最新的position位置
– 优先读取 meta.data 记录的 position 信息
– 如果没有 meta.data 文件,或者里面没有数据,说明当前是第一次启动,此时会执行 show master status 取出最新的 position -
开始dump
– 把上一步取到的 position 发送给 master,表示从当前位置开始 dump
– 循环拉取binlog并且进行解析
– 投递binlog到 EventSink,最后再由 EventSink Put 到 EventStore
前面我们一直说,Server内部可以包含多个Instance,这边的Server可以理解成CanalServerWithEmbedded 对象,其内部通过canalInstances属性来记录各个Instance。
注意:CanalServerWithEmbedded 名称太长了,后面简称为 embeddedServer
//Key是InstanceName,Value是Instance对象
private Map canalInstances;
embeddedServer 内部有多个重要方法,比如:get()、getWithoutAck()、subscribe()、rollback()、unsubscribe()。这些方法乍一看,有点像Client调用的方法,为什么在 embeddedServer 也会有呢?
以get()方法为例,Client执行时会通过NIO将消息发送到nettyServer,而nettyServer进一步调用 embeddedServer 的对应get()方法。其他方法也同理。
接下来,就是介绍 nettyServer
3.3 CanalServerWithNetty 组件CanalServerWithNetty 简称 nettyServer,顾名思义,这是基于 Netty 开发的服务端对象,其业务逻辑主要由几个Handler实现,如下所示:
bootstrap.setPipelineFactory(() -> {
ChannelPipeline pipelines = Channels.pipeline();
//用于解码,这里不用管
pipelines.addLast(FixedHeaderframeDecoder.class.getName(), new FixedHeaderframeDecoder());
//1. Client首次连接时(握手),这里会将加密种子返回给Client,加密种子用于下一步的密码加密
pipelines.addLast(HandshakeInitializationHandler.class.getName(),
new HandshakeInitializationHandler(childGroups));
//2. 校验账号密码(在canal.properties文件设置,分别为 canal.user、canal.passwd)
//这里校验通过了才算连接成功
pipelines.addLast(ClientAuthenticationHandler.class.getName(),
new ClientAuthenticationHandler(embeddedServer));
//3. 处理业务逻辑:get、getWithAck、subscribe 等等
SessionHandler sessionHandler = new SessionHandler(embeddedServer);
pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
return pipelines;
});
nettyServer 与 embeddedServer 的关系,咱们在上一步也讲过了。nettyServer 接收到 Client 的业务请求,会调用 embeddedServer 的对应方法。
也就是说,nettyServer 负责对外监听,embeddedServer 负责执行业务
4. Client解析根据官方提供的 Demo,Client 通常是一段 NIO 程序,它的业务一般分成这几个步骤:
- connect:连接 nettyServer
- subscribe:订阅需要监听 binlog 的表
- 开启死循环,执行下面这几步↓↓↓
- get:获取 binlog 增量数据
- ack:确认操作
- rollback:回滚操作
- 其他:针对取到的 binlog 执行特定的业务需求
Client 与 Server 的交互,可以用这张图表示:
5. 结束语如果你原先对 Canal 只是停留在使用和配置的程度,那么读完这篇文章,应该会有更深层次的理解。这里呢,对源代码没怎么讲解,那样1篇文章是讲不完的,后续我会继续深究 Canal 实现的细节点,到时主要就是结合源码了。



