- 前言
- 流程解析
- 总结
在上一篇博客中我们了解到,PullMessageService线程主要是负责从pullRequestQueue中获得拉取消息请求并进行请求处理的。
PullMessageService#run
//在拉取消息请求队列中拉取消息请求 PullRequest pullRequest = this.pullRequestQueue.take(); //处理请求 this.pullMessage(pullRequest);
但是pullRequestQueue中的PullRequest是从哪来的呢?是什么时候由谁进行填充的呢?
private final linkedBlockingQueue流程解析pullRequestQueue = new linkedBlockingQueue ();
通过pullRequestQueue中的PullRequest添加操作这个线索一步步跟踪下去,最后得出了pullRequestQueue的调用链:
RebalanceService#run ↓ MQClientInstance#doRebalance ↓ DefaultMQPullConsumerImpl#doRebalance ↓ RebalanceImpl#doRebalance ↓ RebalanceImpl#rebalanceByTopic ↓ RebalanceImpl#updateProcessQueueTableInRebalance ↓ RebalancePushImpl#dispatchPullRequest ↓ DefaultMQPushConsumerImpl#executePullRequestImmediately ↓ PullMessageService#executePullRequestImmediately
由上面的调用链我们可以看到,向PullMessageService中的linkedBlockingQueue
RebalanceService
public class RebalanceService extends ServiceThread {
//等待时间
private static long waitInterval =
Long.parseLong(System.getProperty(
"rocketmq.client.rebalance.waitInterval", "20000"));
private final InternalLogger log = ClientLogger.getLog();
//消息客户端
private final MQClientInstance mqClientFactory;
public RebalanceService(MQClientInstance mqClientFactory) {
this.mqClientFactory = mqClientFactory;
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(waitInterval);
//进入mqClientFactory
this.mqClientFactory.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
}
RebalanceService是一个服务线程,其run方法主要是调用MQClientInstance#doRebalance进行重新负载。
MQClientInstance
private final RebalanceService rebalanceService;
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
...
// Start rebalance service
//负载均衡服务启动
this.rebalanceService.start();
}
}
}
MQClientInstance持有一个RebalanceService线程,在start方法中开启该线程。
MQClientInstance#doRebalance
//循环遍历每个消费者组中的MQConsumerInner(即DefaultMQPushConsumerImpl)并调用其doRebalance for (Map.Entry entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } }
DefaultMQPushConsumerImpl#doRebalance
@Override
public void doRebalance() {
if (!this.pause) {
this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
}
}
经过多层的对象委托,终于来到实现消息负载分发的核心。
RebalanceImpl
//消息处理队列 protected final ConcurrentMapprocessQueueTable = new ConcurrentHashMap (64); //Topic的队列信息 protected final ConcurrentMap > topicSubscribeInfoTable = new ConcurrentHashMap >(); //Topic订阅信息 protected final ConcurrentMap subscriptionInner = new ConcurrentHashMap (); //消费者组 protected String consumerGroup; //消费模式 protected MessageModel messageModel; //队列分配策略 protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; //MQ客户端 protected MQClientInstance mQClientFactory;
RebalanceImpl#doRebalance
public void doRebalance(final boolean isOrder) {
Map subTable = this.getSubscriptionInner();
if (subTable != null) {
for (final Map.Entry entry : subTable.entrySet()) {
final String topic = entry.getKey();
try {
//根据Topic来对队列进行重新负载
this.rebalanceByTopic(topic, isOrder);
} catch (Throwable e) {
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("rebalanceByTopic Exception", e);
}
}
}
}
//如果消息队列的Topic不在订阅的主题中-删除该消息队列
this.truncateMessageQueueNotMyTopic();
}
RebalanceImpl#rebalanceByTopic
//从主题订阅消息缓存表中获取主题的队列信息 SetmqSet = this.topicSubscribeInfoTable.get(topic); //查找该主题订阅组所有的消费者ID List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); //消费模式 switch (messageModel) { case BROADCASTING: { ... break; } case CLUSTERING: { if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } //对主题的消息队列和消费者ID进行排序 if (mqSet != null && cidAll != null) { List mqAll = new ArrayList (); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); //获取当前负载均衡策略 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List allocateResult = null; try { //根据策略对消息队列进行重新分配 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set allocateResultSet = new HashSet (); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } //重新负载后-对消息消费队列进行更新-返回消息队列负载是否发生变化 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break; } default: break; }
Rebalance#updateProcessQueueTableInRebalance
boolean changed = false; //消息队列负载是否发生变化 Iterator> it = this.processQueueTable.entrySet().iterator(); //遍历<消息队列,处理队列>缓存表 while (it.hasNext()) { Entry next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); //如果消息队列不在该Topic处理范围内 if (mq.getTopic().equals(topic)) { //消息队列已经被分配到其他消费者去消费了-不包含在当前主题的Set 中 if (!mqSet.contains(mq)) { //private volatile boolean dropped; //设置当前处理队列为被丢弃-及时阻止继续向该消息处理队列进行消息拉取 pq.setDropped(true); //判断是否需要移除 if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); //发生变化 changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } else if (pq.isPullExpired()) { switch (this.consumeType()) { case CONSUME_ACTIVELY: break; case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; default: break; } } } }
RebalancePushImpl#removeUnnecessaryMessageQueue
//丢弃消息队列之前先将消息队列进行持久化
//保存在本地(LocalFileOffsetStore)/消息服务器Broker(RemoteBrokerOffsetStore)
this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq);
this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq);
//顺序消费进入的分支
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
return this.unlockDelay(mq, pq);
} finally {
pq.getConsumeLock().unlock();
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
mq,
pq.getTryUnlockTimes());
pq.incTryUnlockTimes();
}
} catch (Exception e) {
log.error("removeUnnecessaryMessageQueue Exception", e);
}
return false;
}
//暂时只看非顺序消息-返回true
return true;
RebalanceImpl#updateProcessQueueTableInRebalance
ListpullRequestList = new ArrayList (); //遍历消息队列 for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = -1L; try { //根据不同的消息消费策略获取下一次消费的偏移量 //CONSUME_FROM_LAST_OFFSET/CONSUME_FROM_FIRST_OFFSET/CONSUME_FROM_TIMESTAMP nextOffset = this.computePullFromWhereWithException(mq); } catch (Exception e) { log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq); continue; } if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); //消息队列已经存在 if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { //消息队列不存在-新添加 log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); //封装拉取请求PullRequest PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); //放入拉取请求列表 pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } } //分发消息拉取请求 this.dispatchPullRequest(pullRequestList); return changed;
RebalancePushImpl#dispatchPullRequest
@Override public void dispatchPullRequest(ListpullRequestList) { //遍历请求列表 for (PullRequest pullRequest : pullRequestList) { //立刻拉取消息 this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest); log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest); } }
DefaultMQPushConsumerImpl#executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {
//将请求丢入PullMessageService线程中
this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest);
}
PullMessageService
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
//放入消息拉取请求队列中
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
总结
本文主要解析了消息消费端的负载机制,首先RebalanceService线程启动,为消息消费者分发消息队列,每一个MessageQueue消息队列都回构建一个PullRequest,通过将这个PullRequest放入PullMessageService中的pullRequestQueue,进而唤醒PullMessageService#run,在pullRequestQueue中获得拉取消息请求并进行处理。从上一篇的的消息拉取分析中我们可以得知,接下来执行DefaultMQPushConsumerImpl#pullMessage,通过网络远程调用从Broker中拉取消息,一次最多拉取消息数量默认为32条,然后Broker将拉取的消息进行过滤并封装后返回。返回之后再回到消息消费端,将消费任务提交到消费者的ConsumerMessageService执行消息的消费。
本文仅作为个人学习使用,如有不足或错误请指正!



