Producer发送消息源码分析:
producer.send(msg);
1.消息检查Validators.checkMessage(msg, this);
1.消息不能为null
2.topic检查不能为null,不能为非法字符,长度不能为0,topic长度不能大于127
3.topic是否为允许发送的topic
4.body检查,不能为null,长度不能为0,不能大于消息限制长度4M
2.设置topic(如果有命名空间则带上命名空间)msg中含有topic,此处只是为了加上命名空间msg.setTopic(withNamespace(msg.getTopic()));
3.发送消息this.defaultMQProducerImpl.send(msg);
1.默认为同步消息,超时时间为3s DefaultMQProducerImpl.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
1.确定服务状态为running ServiceState.RUNNING
2.再次检查消息内容
3.尝试获取topic的路由发布信息,优先从缓存中获取,如果topic路由发布信息不存在则,更新topic的路由信息到nameserv,并添加到缓存中
获取到路由信息:
1.获取重试次数(同步为1+失败重试次数,异步为1)
2.获取最近一次的brokername
3.根据brokername选择消息队列MessageQueue mqSelected = this.selectoneMessageQueue(topicPublishInfo, lastBrokerName);
是否开启故障延迟机制:(默认不启用故障延迟机制)开启则轮询选择,并且判断broker是否可用,删除一段时间内不可用的broker
整体实现思路:
1.在消息发送失败, mq 根据消息发送耗时来预测该 broker 不可用的时长, 并将 broker 名称, 及” 预计恢复时长“, 存储于 ConcurrentHashMap
faultItemTable 中
2.在开启消息容错后, 选择消息队列时, 会根据当前时间与 FaultItem 中该 broker 的预计恢复时间做比较, 若(System.currentTimeMillis() -
startTimestamp) >= 0, 则预计该 broker 恢复正常, 选择该 broker 的消息队列
3.若所有的 broker 都预计不可用, 随机选择一个不可用的 broker, 从路由信息中选择下一个消息队列, 重置其 brokerName, queueId, 进行消息发
送
默认的投递方式比较简单, 但是也暴露了一个问题, 就是有些 Queue 队列可能由于自身数量积压等原因, 可能在投递的过程比较长, 对于这样的 Queue
队列会影响后续投递的效果。
基于这种现象, RocketMQ 在每发送一个 MQ 消息后, 都会统计一下消息投递的时间延迟, 根据这个时间延迟, 可以知道往哪些 Queue 队列投递的速度快。
在这种场景下, 会优先使用消息投递延迟最小的策略, 如果没有生效, 再使用 Queue 队列轮询的方式
统计一下消息投递的时间延迟: org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem 的实现
记录的地方还是“消息发送流程” 中核心方法中 DefaultMQProducerImpl 中的 sendDefaultImpl()是生产者消息发送的核心方法
这里的操作大概如下:
1. 根据消息发送时长(currentLatency), 计算 broker 不可用时长(duration), 即如果消息发送时间越久, mq 会认为 broker 不可用的时长越久, broker
不可用时长是个经验值, 如果传入 isolation 为 true, 表示默认当前发送时长为 30000L, 即 broker 不可用时长为 600000L
2. 调用 latencyFaultTolerance.updateFaultItem 更新 broker 异常容错信息。
这个方法最终会往一个 ConcurrentHashMap 表中写每台 broker 的延时、 key 是 brokerName, value 是 currentLatency(延时)
不开启则进行轮询选择队列 tpInfo.selectoneMessageQueue(lastBrokerName);
1.为空则进行轮询算法从路由信息消息队列集合中选择一个队列 第一次选择队列(非重试)
2.判断broker是否为空(不为空则轮询选择队列并且brokername不与上次相同即不为同一个broker)重试规避相同的broker
3.没有集群,或者说没有选择则使用默认轮询进行选择
轮询模式使用了threalocal做计数器然后对队列进行取模选择队列,因为本身消息的生产就可以多线程进行, 所以当然要基于线程的上下文来计数递增
选择队列策略的对比
在默认队列选择机制下, 会随机选择一个 MessageQueue, 若发送失败, 轮询队列重新进行重试发送(屏蔽单次发送中不可用的 broker), 同步模式下
默认失败时重试发送 2 次, 但它的问题就是消息发送很大可能再次失败, 引发再次重复失败, 带来不必要的性能损耗。
在开启故障延迟机制后, 消息队列选择时, 会在一段时间内过滤掉 RocketMQ 认为不可用的 broker, 以此来避免不断向宕机的 broker 发送消息, 从而
实现消息发送高可用。
这两个策略没有绝对的好与坏, King 老师个人认为, 如果工作中选择, 应该是看网络环境和服务器的环境。
如果是网络和服务器环境比较好, 那么我推荐默认策略, 毕竟重试的次数和几率比较小。
如果是网络和服务器环境压力比较大, 推荐使用故障延迟机制。
4.如果为重试(即times>0),则重置消息的topic
5.如果队列选择超时(即队列选择花费时间大于超时时间),则设置调用超时为true
6.发送消息 sendKernelImpl
1.生成消息id
2.判断是否有检查终止钩子
3.消息发送之前钩子
4.根据消息类型发送消息this.mQClientFactory.getMQClientAPIImpl().sendMessage发送消息
netty客户端发送消息RemotingCommand response = MQClientAPIImpl.remotingClient.invokeSync(addr, request, timeoutMillis);
5.执行钩子程序
4.验证nameserver设置(即获取namerserver地址列表存不存在)
5.如果步骤3未获取到topic路由信息则抛出异常