特性一:发送延时故障启动机制一sendLatencyFaultEnable
原理图示发送延时一可用性延时~元数据映射源码分析一selectOneMessageQueue
兜底策略一轮询 源码分析一updateFaultItem
更新broker的可用性延时 特性二:Mqclient拉取处理nameserver变更
再谈updateTopicRouteInfoFromNameServer
源码分析一isNeedUpdateTopicRouteInfo生产者更新一updateTopicPublishInfo消费者更新一updateTopicSubscribeInfo 总结
特性一:发送延时故障启动机制一sendLatencyFaultEnable 原理producer发送消息优化机制,如果发送延迟故障打开[sendLatencyFaultEnable=true]则发送时会统计发送耗时和失败[updateFaultItem]当某个broker节点发送失败和发送耗时较长,则在一段时间内不再选择该broker[selectOneMessageQueue] 图示
faultItemTable存储brokerName,FaultItem[broker可用信息]FaultItem 包含 currentLatency发送耗时,brokerName节点名称,startTimestamp时间戳后broker可用
发送延时一可用性延时~元数据映射
| latencyMax数组 | notAvailableDuration数组 |
|---|---|
| 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L | 0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L |
| 发送耗时50ms 100ms 550ms 1s 2s 3s 15s | 可用延时 0s 0s 30s 60s 2min 3min 10min |
发送耗时50和100毫秒,则当前broker延迟0秒发送耗时550和1000毫秒,则当前broker延迟30秒和60秒发送耗时2秒,则当前broker延迟2min发送耗时3秒,则当前broker延迟3min发送耗时15秒,则当前broker延迟10min如果发送失败,则直接延迟10min 源码分析一selectoneMessageQueue
step-1 轮询取MessageQueuestep-2 判断可用性(存储了所有发送消息失败过的broker 和queue)step-3 可用返回MessageQueue用于发送消息step4 for循环找不到则shuffle取得
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// producer.setSendLatencyFaultEnable(true);开启
if (this.sendLatencyFaultEnable) { // 注意默认是false
try {
step-1 轮询取MessageQueue
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
step-2 判断可用性(存储了所有发送消息失败过的broker 和queue)
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
step-3 可用返回MessageQueue用于发送消息
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
step-4 for循环找不到则 shuffle取得一个不是最好的[因为要发送总得取一个messagequeue]
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
修改MessageQueue信息 发往该broker
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 兜底选择 轮询
return tpInfo.selectOneMessageQueue();
}
sendLatencyFaultEnable未开启默认走这里
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
兜底策略一轮询
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
源码分析一updateFaultItem
根据发送延时换算可用性延时[参见元数据]
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
发送消息延时换取对应的可用性延迟 isolation为true一般表示发送异常直接10分钟后可用
isolation为false一般表示发送延时,100毫秒内broker无可用性问题,100毫秒以上发送时间broker随发送时长增加可用性延时
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
根据currentLatency 当前发送延时计算 latencyMax可用性延时
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
更新broker的可用性延时
更新faultItemTable
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
设置当前的broker发送耗时
old.setCurrentLatency(currentLatency);
可用性为时间戳:System.currentTimeMillis() + 不可用间隔
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
特性二:Mqclient拉取处理nameserver变更
再谈updateTopicRouteInfoFromNameServer第七篇讲述updateTopicRouteInfoFromNameServer会监听nameserver的broker信息变更,从而修改生产者和消费者的相关元信息
此外,MQClientInstance.scheduledExecutorService每隔30s
调用updateTopicRouteInfoFromNameServer也会触发生产者和消费者的相关元信息修改
综上: broker注册信息发生改变,是mqClient主动拉取,同时可能存在延时
检查路由是否变化如变化更新生产者信息
如变化更新消费者信息
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
...... 删除其他代码
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
if (!changed) {
检查路由是否发生变化
changed = this.isNeedUpdateTopicRouteInfo(topic);
}
路由存在变化
if (changed) {
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas()) {
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
更新生产者信息
{
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) {
Entry entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicPublishInfo(topic, publishInfo);
}
}
}
更新消费者信息
{
Set subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
Iterator> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
impl.updateTopicSubscribeInfo(topic, subscribeInfo);
}
}
}
log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
}
}
源码分析一isNeedUpdateTopicRouteInfo
单个处理当前topic判断生产者和消费者是否存在相关信息不存在或不正常则需要修改生产者和消费者相关信息
private boolean isNeedUpdateTopicRouteInfo(final String topic) {
boolean result = false;
{
Iterator> it = this.producerTable.entrySet().iterator();
while (it.hasNext() && !result) {
Entry entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null) {
通过MQClientInstance的producerTable获取MQProducer
判断MQProducer.topicPublishInfoTable是否存在或者是否正常
result = impl.isPublishTopicNeedUpdate(topic);
}
}
}
{
Iterator> it = this.consumerTable.entrySet().iterator();
while (it.hasNext() && !result) {
Entry entry = it.next();
MQConsumerInner impl = entry.getValue();
if (impl != null) {
通过MQClientInstance的consumerTable获取MQConsumer
判断MQConsumer.rebalanceImpl.topicSubscribeInfoTable是否存在
result = impl.isSubscribeTopicNeedUpdate(topic);
}
}
}
return result;
}
生产者更新一updateTopicPublishInfo
更新topicPublishInfoTable
public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
if (info != null && topic != null) {
TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
if (prev != null) {
log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
}
}
}
消费者更新一updateTopicSubscribeInfo
消费者更新rebalanceImpl.topicSubscribeInfoTable更新topicSubscribeInfoTable后,消费者会触发再平衡,从而触发消息拉取变化影响整个消息消费动作[后文消费时详解]
public void updateTopicSubscribeInfo(String topic, Set info) {
Map subTable = this.getSubscriptionInner();
if (subTable != null) {
if (subTable.containsKey(topic)) {
this.rebalanceImpl.topicSubscribeInfoTable.put(topic, info);
}
}
}
总结
发送延时机制默认未打开,打开后对消息发送性能有一定优化nameserver的信息变更,由mqclient定时拉取处理,mqclient先修改自身元信息,后修改生产者和消费者相关元信息,参见下图
消费者和生产者注册到mqClient的producerTable和consumerTablemqclient定时主动拉取nameserver信息,修改topicRouteTable和brokerAddrTable触发上层组件生产者和消费者信息修改



