目录
写在前面
1、生产者往Broker集群发送消息底层原理
1.1、创建Topic的时候为何要指定MessageQueue数量?
1.2、Topic、MessageQueue以及Broker之间到底是什么关系
1.3、生产者发送消息的时候写入哪个MessageQueue?
1.4、如果某个Broker出现故障该怎么办?
2、Broker是如何持久化存储消息的?
2.1、CommitLog消息顺序写入机制
2.2、如何让消息写入CommitLog文件近乎内存写性能的?
3、基于DLedger技术的Broker 主从同步原理
3.1、基于DLedger技术替换Broker的CommitLog
3.2、DLedger是如何基于Raft协议选举Leader Broker的?
3.3、DLedger是如何基于Raft协议进行多副本同步的?
3.4、如果Leader Brok
总结
4、消费者是如何获取消息处理以及进行ACK?
4.1、消费者组
4.2、MessageQueue与消费者的关系
4.3、Push和Pull模式
4.4、Broker是如何将消息读取出来返回给消费机器的?
4.5、消费者机器如何处理消息、进行ACK以及提交消费进度?
4.6、如果消费组中出现机器宕机或者扩容加机器,会怎么处理?
5、消费者到底是根据什么策略从Master或Slave上拉取消息的
5.1、ConsumeQueue文件也是基于os cache的
5.2、什么时候会从os cache读?什么时候会从磁盘读?
5.3、Master Broker什么时候会让你从Slave Broker拉取数据?
写在前面
目前公司生产环境的情况,就是部署了一个小规模的RocketMQ生产集群,基本都是在稳定运行中,可以支撑公司的核心链路以及秒杀业务,然后有订单系统、大数据系统、库存系统、积分系统等各种公司核心系统都接入了RocketMQ的生产和消费。
- 对生产者往Broker集群发送消息的底层原理做一个研究?
- 看看Broker对于接收到的消息,到底是如何存储到磁盘上去的?
- 基于DLedger技术部署的Broker高可用集群,到底如何进行数据同步的?
- 消费者到底是基于什么策略选择Master或Slave拉取数据的?
- 消费者是如何从Broker拉取消息回来,进行处理以及ACK的?
- 如果消费者故障了会如何处理?
整体分为1、发送;2、存储;3、同步;4、拉取;5、ack;
1、生产者往Broker集群发送消息底层原理
1.1、创建Topic的时候为何要指定MessageQueue数量?
在创建Topic的时候需要指定一个很关键的参数,就是MQ。简单来说,就是你要指定你的这个Topic对应了多少个队列,也就是多少个MQ。
1.2、Topic、MessageQueue以及Broker之间到底是什么关系
MQ本质上就是一个数据分片的机制。
在这个机制中,假设你的Topic有1万条数据,然后你的Topic有4个MQ,那么大致可以认为会在每个MQ中放入2500条数据。当然,这个不是绝对的,有可能有的MQ的数据多,有的数据少,这个要根据你的消息写入MQ的策略来定。我们有4个MessageQueue平均分配了Topic的数据,这些MessageQueue放在哪里? 当然是放在Broker上了!
很有可能就是在2个Broker上,每个Broker放两个MessageQueue
总结:MQ就是RocketMQ中一个数据分片机制,他通过MQ将一个Topic的数据拆分为了很多个数据分片,然后在每个Broker机器上都存储一些MQ。通过这个就可以实现Topic数据的分布式存储!
1.3、生产者发送消息的时候写入哪个MessageQueue?
生产者会跟NameServer进行通信获取Topic的路由数据。 NameServer上的路由信息包含:一个Topic有几个MQ,MQ1和MQ2在Broker1上,MQ3和MQ4在Broker2上。先假设Producer发送的消息平均分给MQ,例如共20条消息给4个MQ则每个MQ分5条。这样的好处:
- Rocket集群扛更高的并发:假设单个Broker可以抗每秒7万并发,那么两个Broker就可以抗每秒14万并发!这样就可以实现RocketMQ集群抗下每秒10万+超高并发的场景了!
- 数据海量存储:是可以让一个Topic中的数据分散在多个MessageQueue中,进而分散在多个Broker机器上?这样就可以实现RocketMQ集群分布式存储海量的消息数据了。
1.4、如果某个Broker出现故障该怎么办?
通常来说建议大家在Producer中开启一个开关,就是 sendLatencyFaultEnable 。 一旦打开了这个开关,那么他会有一个 自动容错机制 ,比如如果某次访问一个Broker发现网络延迟有500ms,然后还无法访问,那么就会 自动回避访问这个Broker一段时间,比如接下来3000ms内,就不会访问这个Broker了 。这样的话,就可以 避免一个Broker故障之后,短时间内生产者频繁的发送消息到这个故障的Broker上去,出现较多次数的异常 。而是在一个Broker故障之后,自动回避一段时间不要访问这个Broker,过段时间再去访问他。那么这样过一段时间之后,可能这个Master Broker就已经恢复好了,比如他的Slave Broker切换为了Master可以让别人访问了。2、Broker是如何持久化存储消息的?
实际上类似RocketMQ、Kafka、RabbitMQ的消息中间件系统,他们不只是让你写入消息和获取消息那么简单,他们本身最重要的就是提供强大的数据存储能力,可以把亿万级的海量消息存储在自己的服务器的磁盘上。这样的话,各种不同的系统从MQ中消费消息的时候,才可以从MQ服务器的磁盘中读取到自己需要的消息。 否则如果MQ不在机器磁盘上存储大量的消息,如果消息都放在自己的内存里,一个是内存很可能放不下,另外一个是可能你机器重 启,内存里的消息就会全部丢失了。
Broker数据存储实际上才是一个MQ最核心的环节 ,他决定了生产者消息写入的吞吐量,决定了消息不能丢失,决定了消费者获取消息的吞吐量,这2.1、CommitLog消息顺序写入机制
第一步,他会把这个消息直接写入磁盘上的一个日志文件,叫做CommitLog,直接顺序写入这个文件。CommitLog是很多磁盘文件,每个文件限定最多1GB,Broker收到消息之后就直接追加写入这个文件的末尾,如果一个CommitLog写满了1GB,就会创建一个新的CommitLog文件。
Topic的每个MessageQueue都对应了Broker机器上的多个ConsumeQueue文件,保存了这个MessageQueue的所有消息
假设有一个Topic,他有4个MQ,然后在两台Broker机器上,每台Broker机器会存储两个MQ。 假设Producer选择对其中一个MQ写入了一条消息,此时消息会发送到Broker上会把这个消息写入自己的CommitLog文件中,同时会将这条消息在CommitLog中的物理位置,也就是一个文件偏移量,就是一个offset,写入到这条消息所属的MQ对应的ConsumeQueue文件中去。
在ConsumeQueue中存储的每条数据不只是消息在CommitLog中的offset偏移量,还包含了消息的长度,以及tag hashcode,一条数据是20个字节,每个ConsumeQueue文件保存30万条数据,大概每个文件是5.72MB。而每个CommitLog最大1G。
故一个MQ回包含多个commitLog和多个CQ文件。
- commitLog存放具体的消息;
- ConsumerQueue存放每条消息在commitLog中的偏移量。
2.2、如何让消息写入CommitLog文件近乎内存写性能的?
Broker是基于OS操作系统的PageCache和顺序写两个机制,来提升写入CommitLog文件的性能。
- 首先Broker是以顺序的方式将消息写入CommitLog磁盘文件的,也就是每次写入就是在文件末尾追加一条数据就可以了,对文件进行顺序写的性能要比对文件随机写的性能提升很多。
- 数据写入CommitLog文件的时候,其实不是直接写入底层的物理磁盘文件的,而是先进入OS的PageCache内存缓存中,然后后续由OS的后台线程选一个时间,异步化的将OS PageCache内存缓冲中的数据刷入底层的磁盘文件。
举例因为这个部分的性能提升会直接提升Broker处理消息写入的吞吐量,比如你写入一条消息到CommitLog磁盘文件假设需要10ms,那么每个线程每秒可以处理100个写入消息,假设有100个线程,每秒只能处理1万个写入消息请求。 但是如果你把消息写入CommitLog磁盘文件的性能优化为只需要1ms,那么每个线程每秒可以处理1000个消息写入,此时100个线程每秒可以处理10万个写入消息请求。所以大家可以明显看到,Broker把接收到的消息写入CommitLog磁盘文件的性能,对他的TPS有很大的影响。
采用磁盘文件顺序写+OS PageCache写入+OS异步刷盘的策略,基本上可以让消息写入CommitLog的性能跟你直接写入内存里是差不多 的,正是如此,才可以让Broker高吞吐的处理每秒大量的消息写入。- 异步刷盘模式下,生产者把消息发送给Broker,Broker将消息写入OS PageCache中,就直接返回ACK给生产者了。异步刷盘的的策略下,可以让消息写入吞吐量非常高,但是可能会有数据丢失的风险。
- 同步刷盘,如果你使用同步刷盘模式的话,那么生产者发送一条消息出去,broker收到了消息,必须直接强制把这个消息刷入底层的物理磁盘文件中,然后才会返回ack给producer,此时你才知道消息写入成功了。 只要消息进入了物理磁盘上,那么除非是你的物理磁盘坏了导致数据丢失,否则正常来说数据就不会丢失了。导致每条消息写入性能急剧下降,导致消息写入吞吐量急剧下降,但是可以保证数据不会丢失。
同步刷盘和异步刷盘各自的优缺点:高吞吐写入+丢失数据风险,写入吞吐量下降+数据不丢失
3、基于DLedger技术的Broker 主从同步原理
一条数据就会在三个Broker上有三份副本,此时如果Leader Broker宕机,那么就直接让其他的Follower Broker自动切换为新的Leader Broker,继续接受客户端的数据写入就可以了。
3.1、基于DLedger技术替换Broker的CommitLog
Broker上述高可用架构就是基于DLedger技术来实现的。
- DLedger技术实际上首先他自己就有一个CommitLog机制,你把数据交给他,他会写入CommitLog磁盘文件里去,这是他能干的第一件事情。
3.2、DLedger是如何基于Raft协议选举Leader Broker的?
DLedger是基于Raft协议来进行Leader Broker选举的:
Raft协议中选举leader算法的简单描述:确保有人可以成为Leader的核心机制就是一轮选举不出来Leader的话, 就让大家随机休眠一下,先苏醒过来的人会投票给自己,其他人苏醒过后发现自己收到选票了,就会直接投票给那个人。依靠这个随机休眠的机制,基本上几轮投票过后,一般都是可以快速选举出来一个Leader。
举例:Broker01是投票给自己的,Broker02是投票给自己的,Broker03是投票给自己的,他们都把自己的投票发送给了别 人。 此时在第一轮选举中,Broker01会收到别人的投票,他发现自己是投票给自己,但是Broker02投票给Broker02自己,Broker03投票给 Broker03自己,似乎每个人都很自私,都在投票给自己,所以第一轮选举是失败的。 因为大家都投票给自己,怎么选举出来一个Leader呢? 接着每个人会进入一个随机时间的休眠,比如说Broker01休眠3秒,Broker02休眠5秒,Broker03休眠4秒。 此时Broker01必然是先苏醒过来的,他苏醒过来之后,直接会继续尝试投票给自己,并且发送自己的选票给别人。 接着Broker03休眠4秒后苏醒过来,他发现Broker01已经发送来了一个选票是投给Broker01自己的,此时他自己因为没投票,所以会 尊重别人的选择,就直接把票投给Broker01了,同时把自己的投票发送给别人。 接着Broker02苏醒了,他收到了Broker01投票给Broker01自己,收到了Broker03也投票给了Broker01,那么他此时自己是没投票 的,直接就会尊重别人的选择,直接就投票给Broker01,并且把自己的投票发送给别人。 此时所有人都会收到三张投票,都是投给Broker01的,那么Broker01就会当选为Leader。 其实只要有(3台机器 / 2) + 1个人投票给某个人,就会选举他当Leader,这个(机器数量 / 2) + 1就是大多数的意思。
3.3、DLedger是如何基于Raft协议进行多副本同步的?
基于Raft协议实现的两阶段完成的数据同步机制:
数据同步会分为两个阶段,一个是uncommitted阶段,一个是commited阶段 首先Leader Broker上的DLedger收到一条数据之后,会标记为uncommitted状态,然后他会通过自己的DLedgerServer组件把这个 uncommitted数据发送给Follower Broker的DLedgerServer。 接着Follower Broker的DLedgerServer收到uncommitted消息之后,必须返回一个ack给Leader Broker的DLedgerServer,然后如果Leader Broker收到超过半数的Follower Broker返回ack之后,就会将消息标记为committed状态。然后Leader Broker上的DLedgerServer就会发送commited消息给Follower Broker机器的DLedgerServer,让他们也把消息标记为 comitted状态。3.4、如果Leader Brok
如果Leader Broker挂了,此时剩下的两个Follower Broker就会重新发起选举,他们会基于DLedger还是采用Raft协议的算法,去选举出来一个新的Leader Broker继续对外提供服务,而且会对没有完成的数据同步进行一些恢复性的操作,保证数据不会丢失。
总结
高可用Broker架构而言,无论是CommitLog写入,还是多副本同步,Leader选举都是基于DLedger来实现的。
4、消费者是如何获取消息处理以及进行ACK?
4.1、消费者组
不同的系统应该设置不同的消费组
,如果不同的消费组订阅了同一个Topic,对Topic里的一条消息,每个消费组都会获取到这条消息。
如果一个系统的同一个消费者组含有多台机器,那么只有一台机器获取该消息。
比如库存系统部署了4台机器,每台机器上的消费者组的名字都是“stock_consumer_group”,那么这4台机器就同属于一个消费者组,其中只有一台机器消费同一个消息。以此类推,每个系统的几台机器都是属于各自的消费者组的。
对于一个消费组而言,他获取到一条消息之后,如果消费组内部有多台机器,到底是只有一台机 器可以获取到这个消息,还是每台机器都可以获取到这个消息?
- 集群模式 ,也就是说,一个消费组获取到一条消息,只会交给组内的一台机器去处理,不是每台机器都可以获取到这条消息的。
- 广播模式,那么对于消费组获取到的一条消息,组内每台机器都可以获取到这条消息。但是相对而言广播模式其实用的很少,常见基本上都是使用集群模式来进行消费的。
4.2、MessageQueue与消费者的关系
对于一个Topic上的多个MessageQueue,是如何由一个消费组中的多台机器来进行消费的呢?
- 可以简单的理解为,他会均匀的将MessageQueue分配给消费组的多台机器来消费。
举例:
假设我们的“TopicOrderPaySuccess”有4个MessageQueue,这4个MessageQueue分布在两个Master Broker上,每个Master Broker上有2个MessageQueue。然后库存系统作为一个消费组里有两台机器,那么正常情况下,当然最好的就是让这两台机器每个都负责2个MessageQueue的消费了比如库存系统的机器01从Master Broker01上消费2个MessageQueue,然后库存系统的机器02从Master Broker02上消费2个MessageQueue,这样不就把消费的负载均摊到两台Master Broker上去了。
大致可以认为一个Topic的多个MessageQueue会均匀分摊给消费组内的多个机器去消费,这里的一个原则就是,一个MessageQueue只能被一个消费机器去处理,但是一台消费者机器可以负责多个MessageQueue的消息处理。
4.3、Push和Pull模式
一个消费组内的多台机器是分别负责一部分MessageQueue的消费的,那么既然如此,每台机器都必须去连接到 对应的Broker,尝试消费里面的MessageQueue对应的消息了。
两种消费模式一个是Push,一个是Pull
实际上,这两个消费模式本质是一样的,都是消费者机器主动发送请求到Broker机器去拉取一批消息下来。
Push消费模式本质底层也是基于这种消费者主动拉取的模式来实现的,只不过他的名字叫做Push而已,意思是
Broker会尽可能实时的把新消息交给消费者机器来进行处理,他的消息时效性会更好
。一般我们使用RocketMQ的时候,消费模式通常都是基于他的Push模式来做的,
Push模式的实现思路
:当消费者发送请求到Broker去拉取消息的时候,如果有新的消息可以消费那么就会立马返回一批消息到消费机器去处理,处理完之后会接着立刻发送请求到Broker机器去拉取下一批消息。所以消费机器在Push模式下会处理完一批消息,立马发起请求拉取下一批消息,消息处理的时效性非常好,看起来就跟Broker一直不停的推送消息到消费机器一样。
Push模式下有一个
请求挂起和长轮询的机制:
当你的请求发送到Broker,结果他发现没有新的消息给你处理的时候,就会让请求线程挂起,默认是挂起15秒,然后这个期间他会有后台线程每隔一会儿就去检查一下是否有的新的消息给你,另外如果在这个挂起过程中,如果有新的消息到达了会主动唤醒挂起的线程,然后把消息返回给你。
4.4、Broker是如何将消息读取出来返回给消费机器的?
本质就是根据你要消费的MQ以及开始消费的位置,去找到对应的ConsumeQueue读取里面对应位置的消息在CommitLog中的物理offset偏移量,然后到CommitLog中根据offset读取消息数据,返回给消费者机器。
4.5、消费者机器如何处理消息、进行ACK以及提交消费进度?
4.5、消费者机器如何处理消息、进行ACK以及提交消费进度?
消费者机器拉取到一批消息之后,就会将这批消息回调我们注册的一个函数(处理消息并标识该消息成功消费),当我们处理完这批消息之后,消费者机器就会提交我们目前的一个消费进度到Broker上去,然后Broker就会存储我们的消费进度。比如我们现在对ConsumeQueue0的消费进度假设就是在offset=1的位置,那么他会记录下来一个ConsumeOffset的东西去标记我们的消费进度,那么下次这个消费组只要再次拉取这个ConsumeQueue的消息,就可以从Broker记录的消费位置开始继续拉取,不用重头开始拉取 了。类似于看书的书签,书签记录看的进度。
4.6、如果消费组中出现机器宕机或者扩容加机器,会怎么处理?
rabalance(负载重平衡)
:重新给各个消费机器分配他们要处理的MessageQueue。
举例:比如现在机器01负责MessageQueue0和Message1,机器02负责MessageQueue2和MessageQueue3,现在机器02宕机了,那么机器01就会接管机器02之前负责的MessageQueue2和MessageQueue3。或者如果此时消费组加入了一台机器03,此时就可以把机器02之前负责的MessageQueue3转移给机器03,然后机器01就仅仅负责一个MessageQueue2的消费了,这就是的概念。
5、消费者到底是根据什么策略从Master或Slave上拉取消息的
到底什么时候从Master Broker拉取,什么时候从Slave Broker拉取?
- 刚开始消费者都是连接到Master Broker机器去拉取消息的,然后如果Master Broker机器觉得自己负载比较高,就会告诉消费者机器,下次可以从Slave Broker机器去拉取。
5.1、ConsumeQueue文件也是基于os cache的 对于 Broker机器的磁盘上的大量的ConsumeQueue文件,在写入的时候也都是优先进入os cache 中的。ConsumeQueue文件主要是存放消息的offset,所以每个文件很小,30万条消息的offset就只有5.72MB而已。所以 实际上ConsumeQueue文件们是不占用多少磁盘空间的,他们整体数据量很小,几乎可以完全被os缓存在内存cache 里。
当你拉取消息的时候:
- 第一步 大量的频繁读取ConsumeQueue文件,几乎可以说就是跟读内存里的数据的性能是一样的,通过这个就可以保证数据消费的高性能以及高吞吐。
- 第二步 就是要根据你读取到的offset去CommitLog里读取消息的完整数据了。
但是当你去CommitLog文件里读取完整消息数据的时候,会有两种可能从os cache或磁盘。
- 第一种可能,如果你读取的是那种刚刚写入CommitLog的数据,那么大概率他们还停留在os cache中,此时你可以顺利的直接从os cache里读取CommitLog中的数据,这个就是内存读取,性能是很高的。
- 第二种可能,你也许读取的是比较早之前写入CommitLog的数据,那些数据早就被刷入磁盘了,已经不在os cache里了,那么此时你就只能从磁盘上的文件里读取了,这个性能是比较差一些的。
5.2、什么时候会从os cache读?什么时候会从磁盘读?
- 如果你的消费者机器一直快速的在拉取和消费处理,紧紧的跟上了生产者写入broker的消息速率,那么你每次拉取几乎都是在拉取最近人家刚写入CommitLog的数据,那几乎都在os cache里。
- 如果broker的负载很高,导致你拉取消息的速度很慢,或者是你自己的消费者机器拉取到一批消息之后处理的时候性能很低,处理的速度很慢,这都会导致你跟不上生产者写入的速率。
举例:
人家都写入10万条数据了,结果你才拉取了2万条数据,此时有5万条最新的数据是在os cache里,有3万条你还没拉取的数据是在磁盘里,那么当后续你再拉取的时候,必然很大概率是从磁盘里读取早就刷入磁盘的3万条数据。接着之前在os cache里的5万条数据可能又被刷入磁盘了,取而代之的是更新的几万条数据在os cache里,然后你再次拉取的时候,又会从磁盘里读取刷入磁盘里的5万条数据,相当于你每次都在从磁盘里读取数据了!
5.3、Master Broker什么时候会让你从Slave Broker拉取数据?
对比你当前没有拉取消息的数量和大小,以及最多可以存放在os cache内存里的消息的大小,如 果你没拉取的消息超过了最大能使用的内存的量,那么说明你后续会频繁从磁盘加载数据,此时就让你从slave broker去加载数据。



