栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RocketMQ一些细节执行流程

RocketMQ一些细节执行流程

NameServer

功能

一 是维护Broker的服务地址并进行及时的更新。

二 是给Producer和Consumer提供服务获取Broker列表

启动流程

整个NameServer的核心就是一个NamesrvController对象。响应客户端请求的。
在创建NamesrvController对象时,有两个关键的配置
NamesrvConfig 这个是NameServer自己运行需要的配置信息。 NettyServerConfig 包含Netty服务端的配置参数,默认占用了9876端口。可以 在配置文件中覆盖。
然后在启动服务时,启动几个重要组件:
RemotingServer 这个就是用来响应请求的。

还有一个定时任务会定时扫描不活动的Broker。这个Broker管理是通过 routeInfoManager这个功能组件。
在关闭服务时,关闭了四个东西
RemotingServer

remotingExecutor Netty服务线程池;

scheduledExecutorService 定时任务;

fileWatchService 这个是用来跟踪TLS配置的。这是跟权限相关的。

 

Broker启动  

围绕一个BrokerController对象,先创建,然后再启动。
首先:在BrokerStartup.createBrokerController方法中可以看到Broker的几个核 心配置: 
 BrokerConfig 
 NettyServerConfig  :Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。
 NettyClientConfig  
 MessageStoreConfig
然后:在BrokerController.start方法可以看到启动了一大堆Broker的核心服务
this.messageStore.start();启动核心的消息存储组件
this.remotingServer.start(); this.fastRemotingServer.start(); 启动两个Netty服务
this.brokerOuterAPI.start();启动客户端,往外发请求
BrokerController.this.registerBrokerAll: 向NameServer注册心跳。
this.brokerStatsManager.start(); this.brokerFastFailure.start();这也是一些负责具体业务的功能组件

Broker注册 

 BrokerController.this.registerBrokerAll方法会发起向NameServer注册心跳。启 动时会立即注册,同时也会启动一个线程池,以10秒延迟,默认30秒的间隔 持续向 NameServer发送心跳。
BrokerController.this.registerBrokerAll这个方法就是注册心跳的入口

然后,在NameServer中也会启动一个定时任务,扫描不活动的Broker。具体观察 NamesrvController.initialize方法 

Producer  

Producer有两种
一种是普通发送者:DefaultMQProducer。这个只需要构建一个Netty客户端, 往Broker发送消息就行了。注意,异步回调只是在Producer接收到Broker的响 应后自行调整流程,不需要提供Netty服务。

另一种是事务消息发送者: TransactionMQProducer。这个需要构建一个 Netty客户端,往Broker发送消息。同时也要构建Netty服务端,供Broker回查 本地事务状态。

关于Borker路由信息的管理: Producer需要拉取Broker列表,然后跟 Broker建立连接等等很多核心的流程,其实都是在发送消息时建立的。因为在启动 时,还不知道要拉取哪个Topic的Broker列表呢。所以对于这个问题,我们关注的重 点,不应该是start方法,而是send方法。
而对NameServer的地址管理,则是散布在启动和发送的多个过程当中,并且 NameServer地址可以通过一个Http服务来获取。
Send方法中,首先需要获得Topic的路由信息。这会从本地缓存中获取,如果本地 缓存中没有,就从NameServer中去申请。

Producer的负载均衡

在之前介绍RocketMQ的顺序消息时,讲到了Producer的负载均衡策略,默认会把 消息平均的发送到所有MessageQueue里的。那到底是怎么进行负载均衡的呢?
获取路由信息后,会选出一个MessageQueue去发送消息。这个选 MessageQueue的方法就是一个索引自增然后取模的方式。然后 在发送Netty请求时,实际上是指定的MessageQueue,而不是Topic。Topic 只是用来找MessageQueue。
然后根据MessageQueue再找所在的Broker,往Broker发送请求。
 

消息存储 
功能 
Producer把消息发到了Broker,接下来就关注下Broker接收 到消息后是如何把消息进行存储的。最终存储的文件有哪些?
commitLog:消息存储目录

config:运行期间一些配置信息
consumerqueue:消息消费队列存储目录 

index:消息索引文件存储目录

abort:如果存在改文件寿命Broker非正常关闭

checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、 consumerquueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。
messageStore是负责消息存 储的核心组件。 

消息存储的入口在:DefaultMessageStore.putMessage
1-commitLog写入
CommitLog的doAppend方法就是Broker写入消息的实际入口。这个方法最终会 把消息追加到MappedFile映射的一块内存里,并没有直接写入磁盘。写入消息的过 程是串行的,一次只会允许一个线程写入。
2-分发ConsumeQueue和IndexFile
 当CommitLog写入一条消息后,在DefaultMessageStore的start方法中,会启 动一个后台线程reputMessageService每隔1毫秒就会去拉取CommitLog中最新更 新的一批消息,然后分别转发到ComsumeQueue和IndexFile里去,这就是他底层 的实现逻辑。
 并且,如果服务异常宕机,会造成CommitLog和ConsumeQueue、IndexFile文 件不一致,有消息写入CommitLog后,没有分发到索引文件,这样消息就丢失了。 DefaultMappedStore的load方法提供了恢复索引文件的方法,入口在load方法。
3、文件同步刷盘与异步刷盘
 入口:CommitLog.putMessage -> CommitLog.handleDiskFlush
 其中主要涉及到是否开启了对外内存。TransientStorePoolEnable。如果开启了 堆外内存,会在启动时申请一个跟CommitLog文件大小一致的堆外内存,这部分内 存就可以确保不会被交换到虚拟内存中。
4、过期文件删除
 入口: DefaultMessageStore.addScheduleTask -> DefaultMessageStore.this.cleanFilesPeriodically()
 默认情况下, Broker会启动后台线程,每60秒,检查CommitLog、 ConsumeQueue文件。然后对超过72小时的数据进行删除。也就是说,默认情况 下, RocketMQ只会保存3天内的数据。这个时间可以通过fileReservedTime来配 置。注意他删除时,并不会检查消息是否被消费了。
整个文件存储的核心入口入口在DefaultMessageStore的start方法中。

总结:
RocketMQ的存储文件包括消息文件(Commitlog)、消息消费队列文件 (ConsumerQueue)、Hash索引文件(IndexFile)、监测点文件 (checkPoint)、abort(关闭异常文件)。单个消息存储文件、消息消费队列文 件、Hash索引文件长度固定以便使用内存映射机制进行文件的读写操作。 RocketMQ组织文件以文件的起始偏移量来命令文件,这样根据偏移量能快速定位 到真实的物理文件。RocketMQ基于内存映射文件机制提供了同步刷盘和异步刷盘 两种机制,异步刷盘是指在消息存储时先追加到内存映射文件,然后启动专门的刷 盘线程定时将内存中的文件数据刷写到磁盘。
CommitLog,消息存储文件,RocketMQ为了保证消息发送的高吞吐量,采用单一 文件存储所有主题消息,保证消息存储是完全的顺序写,但这样给文件读取带来了 不便,为此RocketMQ为了方便消息消费构建了消息消费队列文件,基于主题与队 列进行组织,同时RocketMQ为消息实现了Hash索引,可以为消息设置索引键,根 据所以能够快速从CommitLog文件中检索消息。
当消息达到CommitLog后,会通过ReputMessageService线程接近实时地将消息 转发给消息消费队列文件与索引文件。为了安全起见,RocketMQ引入abort文件, 记录Broker的停机是否是正常关闭还是异常关闭,在重启Broker时为了保证 CommitLog文件,消息消费队列文件与Hash索引文件的正确性,分别采用不同策 略来恢复文件。
RocketMQ不会永久存储消息文件、消息消费队列文件,而是启动文件过期机制并 在磁盘空间不足或者默认凌晨4点删除过期文件,文件保存72小时并且在删除文件时 并不会判断该消息文件上的消息是否被消费。

消费者 
功能

消费者也是有两种,推模式消费者和拉模式消费者

消费者以消费者组的模式开展。消费者组之间有集群模式和广播模式两种消费模式,消费者端的负载均衡的原理。即消费者是如何绑定消费队列的

拉模式: PullMessageService
PullRequest里有messageQueue和processQueue,其中messageQueue负责拉 取消息,拉取到后,将消息存入processQueue,进行处理。 存入后就可以清空 messageQueue,继续拉取了

客户端负载均衡策略
在消费者示例的start方法中,启动RebalanceService,这个是客户端进行负载均衡 策略的启动服务。他只负责根据负载均衡策略获取当前客户端分配到的 MessageQueue示例。
 
五种负载策略,可以由Consumer的allocateMessageQueueStrategy属性来选 择。
最常用的是AllocateMessageQueueAveragely平均分配和 AllocateMessageQueueAveragelyByCircle平均轮询分配。 
平均分配是把MessageQueue按组内的消费者个数平均分配。
而平均轮询分配就是把MessageQueue按组内的消费者一个一个轮询分配。 

并发消费与顺序消费的过程
消费的过程依然是在DefaultMQPushConsumerImpl的 consumeMessageService中。他有两个子类 ConsumeMessageConcurrentlyService和ConsumeMessageOrderlyService。 其中最主要的差别是ConsumeMessageOrderlyService会在消费前把队列锁起来, 优先保证拉取同一个队列里的消息

延迟消息 
1、功能回顾 
我们这里,就用一个典型的延迟消息的流程,来把上面看到的各个组件,结合一 下。
延迟消息的核心使用方法就是在Message中设定一个MessageDelayLevel参数,对 应18个延迟级别。然后Broker中会创建一个默认的Schedule_Topic主题,这个主题 下有18个队列,对应18个延迟级别。消息发过来之后,会先把消息存入 Schedule_Topic主题中对应的队列。然后等延迟时间到了,再转发到目标队列,推 送给消费者进行消费。
整个延迟消息的实现方式是这样的:

延迟消息的处理入口在scheduleMessageService这个组件中。 他会在broker启动 时也一起加载。
1、消息写入:
代码见CommitLog.putMessage方法。
在CommitLog写入消息时,会判断消息的延迟级别,然后修改Message的Topic和 Queue,达到转储Message的目的。
2、消息转储到目标Topic
这个转储的核心服务是scheduleMessageService,他也是Broker启动过程中的一 个功能组件、
然后ScheduleMessageService会每隔1秒钟执行一个executeOnTimeup任务,将消息从延迟队列中写入正常Topic中。 代码见ScheduleMessageService中的 DeliverDelayedMessageTimerTask.executeOnTimeup方法。
这个其中有个需要注意的点就是在ScheduleMessageService的start方法中。有一 个很关键的CAS操作:
if (started.compareAndSet(false, true)) 
这个CAS操作保证了同一时间只会有一个DeliverDelayedMessageTimerTask执 行。保证了消息安全的同时也限制了消息进行回传的效 

小结: 
RocketMQ消息消费方式分别为集群模式、广播模式。
消息队列负载由RebalanceService线程默认每隔20s进行一次消息队列负载,根据 当前消费者组内消费者个数与主题队列数量按照某一种负载算法进行队列分配,分 配原则为同一个消费者可以分配多个消息消费队列,同一个消息消费队列同一个时 间只会分配给一个消费者。
消息拉取由PullMessageService线程根据RebalanceService线程创建的拉取任务进 行拉取,默认每次拉取32条消息,提交给消费者消费线程后继续下一次消息拉取。 如果消息消费过慢产生消息堆积会触发消息消费拉取流控。 
并发消息消费指消费线程池中的线程可以并发对同一个消息队列的消息进行消费, 消费成功后,取出消息队列中最小的消息偏移量作为消息消费进度偏移量存储在于 消息消费进度存储文件中,集群模式消息消费进度存储在Broker(消息服务器), 广播模式消息消费进度存储在消费者端。
RocketMQ不支持任意精度的定时调度消息,只支持自定义的消息延迟级别,例如 1s、2s、5s等,可通过在broker配置文件中设置messageDelayLevel。
顺序消息一般使用集群模式,是指对消息消费者内的线程池中的线程对消息消费队 列只能串行消费。与并发消息消费最本质的区别是消息消费时必须成功锁定消息消 费队列,在Broker端会存储消息消费队列的锁占用情况。 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735884.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号