栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Canal全方位深入讲解,看这一篇就够了

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Canal全方位深入讲解,看这一篇就够了

文章目录
  • 1. 概述
  • 2. Canal的组成
  • 3. Server解析
    • 3.1 Instance组件
      • 3.1.1 metaManager
      • 3.1.2 EventStore
      • 3.1.3 EventSink
      • 3.1.4 EventParser
      • 3.1.5 Instance总结
    • 3.2 CanalServerWithEmbedded 组件
    • 3.3 CanalServerWithNetty 组件
  • 4. Client解析
  • 5. 结束语

1. 概述

Canal是阿里巴巴开发的一项开源组件,通过将自己伪装成MySQL Slave节点来接收Master节点的Binlog日志,然后就可以实现我们的需求,例如:同步到MySQL从库、同步到Elasticsearch、同步到Kafka 等等。

2. Canal的组成

Canal主要分成 Server(服务端)和 Client(客户端)

  • Server 主要由 Instance 构成,1个Server可以有多个Instance,每个Instance由包括 EventParser、EventSink、EventStore、metaManager 四个部分组成,如下图所示:


EventParser:连接MySQL,充当 Slave 和 Master 进行交互,并且实现协议解析

EventSink:EventParser 和 EventStore 的连接器,可以对数据进行一定的处理

EventStore:数据存储,内部通过 Event[] 数组记录,因此数据是存在内存

metaManager:维护 conf/Instance名称/meta.dat 文件,记录了Client消费的进度(binlog文件名、position等等)

  • Client 称为客户端,官方提供的Client Demo是一段Java程序,使用NIO连接 Server 获取Binlog进行处理。此外,阿里也开发了几个现成的Adapter(适配器),通过配置就能实现数据同步到Log、ES、Kafka等等,不需要编写任何的Client代码。
3. Server解析 3.1 Instance组件 3.1.1 metaManager

元数据管理器,主要是对Client、Position信息进行记录。这里使用的是 FileMixedmetaManager 文件型管理器,它继承了 MemorymetaManager 内存型管理器,它们的关系如下:

启动的时候,会优先调用 MemorymetaManager.start() 方法,初始化Map对象,主要用于记录每个 Client 与 Position 的对应关系。

public class MemorymetaManager extends AbstractCanalLifeCycle implements CanalmetaManager {

    protected Map>              destinations;
    protected Map batches;
    protected Map                  cursors;

    public void start() {
        super.start();

        batches = MigrateMap.makeComputingMap(MemoryClientIdentityBatch::create);

        cursors = new MapMaker().makeMap();

        destinations = MigrateMap.makeComputingMap(destination -> Lists.newArrayList());
    }
}

然后,才是调用 FileMixedmetaManager 自己的启动逻辑:

  • 判断 …/conf 目录是否存在,不存在就创建
  • 判断 …/conf 目录下是否存在当前 Instance 的目录,不存在就创建
  • 创建频率为1秒的定时任务,定时把内存的meta数据刷新到 meta.dat 文件
3.1.2 EventStore

这里使用的类型为 MemoryEventStoreWithBuffer,其内部维护名称为 entries 的 Event 数组对象,默认大小为 16384。

EventStore 有 3 种主要操作:Put、Get、Ack

  • Put:添加数据,通过 putSequence 标记最后一次写操作的位置,默认 -1
  • Get:读取数据,由Client发起,通过 getSequence 标记当前读取到的位置,默认 -1
  • Ack:确认读取,由Client发起,在Get之后调用,表示Get的数据已经消费完毕,此操作会删除数据。通过 ackSequence 标记最后 Ack 的位置,默认 -1

举个例子说明他们之间的关系:

假设初始状态下,entries 数组长度等于10

执行 put 10 笔数据,此时 putSequence = 9

执行 get 8 笔数据,此时 getSequence = 7

执行 act 5 笔数据,此时 ackSequence = 4

特殊情况一:entries满了之后,继续 put 2 笔数据,会发生什么?

EventStore 计算下标的逻辑是调用 getIndex() 方法,当下标超过最大值,会从新开始计算。也就是说,继续 put 2 笔数据会放在下标 0 - 1

特殊情况二:以上面最后一个截图为例,继续 put 8 笔数据,会发生什么?

  • Put 操作之前,EventStore 会先调用 checkFreeSlotAt 进行检查,因为put 8 笔数据会放在 0 - 7,已经超过 ack 的进度(4),此时会返回 false 给 EventSink,并且不会继续执行 put 操作。
  • EventSink 收到 false 结果会进入阻塞状态,直到 put 成功为止。
3.1.3 EventSink

这里使用的类型为 EntryEventSink,内部的核心方法是 sinkData() ,作用是对Event进行过滤筛选,然后调用 eventStore.tryPut(),把事件传递给EventStore进行存储。

3.1.4 EventParser

EventParser 是独立一个线程在持续的运行,它的启动代码在 AbstractEventParser.start(),主要做以下工作:

  • 计算最新的position位置
    – 优先读取 meta.data 记录的 position 信息
    – 如果没有 meta.data 文件,或者里面没有数据,说明当前是第一次启动,此时会执行 show master status 取出最新的 position

  • 开始dump
    – 把上一步取到的 position 发送给 master,表示从当前位置开始 dump
    – 循环拉取binlog并且进行解析
    – 投递binlog到 EventSink,最后再由 EventSink Put 到 EventStore

3.1.5 Instance总结

3.2 CanalServerWithEmbedded 组件

前面我们一直说,Server内部可以包含多个Instance,这边的Server可以理解成CanalServerWithEmbedded 对象,其内部通过canalInstances属性来记录各个Instance。

注意:CanalServerWithEmbedded 名称太长了,后面简称为 embeddedServer

	//Key是InstanceName,Value是Instance对象
    private Map canalInstances;

embeddedServer 内部有多个重要方法,比如:get()、getWithoutAck()、subscribe()、rollback()、unsubscribe()。这些方法乍一看,有点像Client调用的方法,为什么在 embeddedServer 也会有呢?

以get()方法为例,Client执行时会通过NIO将消息发送到nettyServer,而nettyServer进一步调用 embeddedServer 的对应get()方法。其他方法也同理。

接下来,就是介绍 nettyServer

3.3 CanalServerWithNetty 组件

CanalServerWithNetty 简称 nettyServer,顾名思义,这是基于 Netty 开发的服务端对象,其业务逻辑主要由几个Handler实现,如下所示:

        bootstrap.setPipelineFactory(() -> {
            ChannelPipeline pipelines = Channels.pipeline();
            
            //用于解码,这里不用管
            pipelines.addLast(FixedHeaderframeDecoder.class.getName(), new FixedHeaderframeDecoder());
            
            //1. Client首次连接时(握手),这里会将加密种子返回给Client,加密种子用于下一步的密码加密
            pipelines.addLast(HandshakeInitializationHandler.class.getName(),
                new HandshakeInitializationHandler(childGroups));
                
            //2. 校验账号密码(在canal.properties文件设置,分别为 canal.user、canal.passwd)
            //这里校验通过了才算连接成功
            pipelines.addLast(ClientAuthenticationHandler.class.getName(),
                new ClientAuthenticationHandler(embeddedServer));
                
			//3. 处理业务逻辑:get、getWithAck、subscribe 等等
            SessionHandler sessionHandler = new SessionHandler(embeddedServer);
            pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
            
            return pipelines;
        });

nettyServer 与 embeddedServer 的关系,咱们在上一步也讲过了。nettyServer 接收到 Client 的业务请求,会调用 embeddedServer 的对应方法。

也就是说,nettyServer 负责对外监听,embeddedServer 负责执行业务

4. Client解析

根据官方提供的 Demo,Client 通常是一段 NIO 程序,它的业务一般分成这几个步骤:

  • connect:连接 nettyServer
  • subscribe:订阅需要监听 binlog 的表
  • 开启死循环,执行下面这几步↓↓↓
  • get:获取 binlog 增量数据
  • ack:确认操作
  • rollback:回滚操作
  • 其他:针对取到的 binlog 执行特定的业务需求

Client 与 Server 的交互,可以用这张图表示:

5. 结束语

如果你原先对 Canal 只是停留在使用和配置的程度,那么读完这篇文章,应该会有更深层次的理解。这里呢,对源代码没怎么讲解,那样1篇文章是讲不完的,后续我会继续深究 Canal 实现的细节点,到时主要就是结合源码了。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/324583.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号