- 主从同步概述
- 主从同步流程
- 相关名词
- 配置数据同步流程
- CommitLog数据同步流程
- 异步复制
- 同步复制
Master主要用于处理生产者、消费者的请求和存储数据
Slave从Master同步所有数据到本地,具体作用提现如下:
- Broker服务高可用。一般生产环境不会部署两个主Broker节点和两个从Broker节点(也叫 2m2s),一个 Master宕机后,另一个 Master可以接管工作;如果两个 Master都宕机,消费者可以通过连接Slave继续消费。这样可以保证服务的高可用
- 提高服务性能。如果消费者从Master Broker拉取消息时,发现拉取消息的offset和CommitLog的物理offset相差太多,会转向Slave拉取消息,这样可以减轻Master的压力,从而提高性能
Broker同步数据的方式有两种:同步复制、异步复制
- 同步复制 是指 客户端发送消息到Master,Master将消息同步复制到Slave的过程,可以通过设置参数 brokerRole=BrokerRole.SYNC_MASTER 来实现。这种消息配置的可靠性很强,但是效率比较低,适用于金融、在线教育等对消息有强可靠需求的场景
- 异步复制 是指 客户端发送消息到 Master,再由 线程 HAService 异步同步到 Slave的过程,可以通过设置参数 brokerRole=BrokerRole.ASYNC_MASTER 来实现。这种消息配置的效率非常高,可靠性比同步复制差,适用于大部分业务场景
Broker主从同步数据有两种:配置数据和消息数据
配置数据主要包含Topic配置、消费者位点信息、延迟消息位点信息、订阅关系配置等
消息数据是生产者发送的消息,保存在 CommitLog 中,由 HAService 服务实时同步到Slave Broker中
Broker 主从同步的逻辑是通过 org.apache.rocketmq.broker.slave.SlaveSynchronize.syncAll()方法实现的。该方法在org.apache.rocketmq.broker.BrokerController.initialize)_方法中被初始化,每60s同步一次,并且同步周期不能修改
主从同步流程 相关名词 配置数据同步流程配置数据包含 4种类型:Topic 配置、消费者位点、延迟位点、订阅关系配置
每种配置数据由一个继承自ConfigManager的类来管理,继承关系如下:
Slave如何从Master同步这些配置呢?
- Master Broker在启动时,初始化一个BrokerOuterAPI,这个服务的功能包含Broker注册到Namesrv、Broker从Namesrv解绑、获取Topic配置信息、获取消费者位点信息、获取延迟位点信息及订阅关系等
- Slave Broker在初始化Controller的定时任务时,会初始化SlaveSynchronize服务,每60s调用一次SlaveSynchronize.syncAll()方法
- syncAll()方法依次调用4种配置数据(Topic配置、消费者位点、延迟位点、订阅关系配置)的同步方法同步全量数据
- syncAll()中执行的4个方法都通过Remoting模块同步调用BrokerOuterAPI,并从Master Broker获取数据,保存到Slave中
- Topic 配置和订阅关系配置随着保存内存信息的同时持久化到磁盘上;消费者位点通过 BrokerController 初始化定时任务持久化到磁盘上;延迟位点信息通过ScheduleMessageService定时将内存持久化到磁盘上
CommitLog 的数据同步分为同步复制和异步复制两种
同步复制是生产者生产消息后,等待Master Broker将数据同步到Slave Broker后,再返回生产者数据存储状态
异步复制是生产者在生产消息后,不用等待Slave同步,直接返回Master存储结果
Master Broker启动时 会启动 HAService.AcceptSocketService服务,当监听到来自 Slave 的注册请求时 会创建一个 HAConnection,同时 HAConnection 会创建ReadSocketService和WriteSocketService两个服务并启动,开始主从数据同步
ReadSocketService接收Slave同步数据请求,并将这些信息保存在HAConnection中
WriteSocketService根据HAConnection中保存的Slave同步请求,从CommitLog中查询数据,并发送给Slave
ReadSocketService和 WriteSocketService是两个独立工作的线程服务,它们通过HAConnection中的公共变量将CommitLog同步给Slave
同步复制在 CommitLog 将消息存储到 Page Cache 后,会调用 CommitLog.handleHA()方法处理同步复制
一个 Master Broker 可以配置多个 Slave Broker,当需要同步数据时,通过service.getWaitNotifyObject().wakeupAll() 来唤醒 全部的Slave同步
虽然多个Slave都同步了数据,但是一旦Master Broker不可用时,消费者只会从一个Slave中拉取消息,所以生产环境建议Slave不要配置太多
Slave 在发送请求数据的 Request 时,会带上 Slave 请求的位点HAConnection.slaveRequestOffset,该值如果等于-1(默认),则表示没有 Slave 请求过位点数据
ReadSocketService后台服务不断接收Slave Broker上报的offset,每上报一次都通知HAService.notifyTransferSome()方法,判断 Slave 同步的位点是否大于 Master 标记的已同步位点
如果大于则更新标记值,同时通知同步复制服务 GroupTransferService
GroupTransferService扫描所有的同步请求,依次判断哪些GroupCommitRequest的待同步复制的位点是比已同步位点小的,释放 GroupCommitRequest 中的锁,消息处理线程可以将消息存储成功的结果返回给生产者
消费队列文件(Consume Queue)和索引文件(Index File)为什么没有同步给Slave呢?
这两个文件都可以在Slave Broker上追加CommitLog后由ReputMessageService进行创建,所以不需要同步



