序言:PullCallback消费原理图ConsumeMessageOrderlyService一顺序消费
消费服务构造函数提交消费请求消费执行 ConsumeMessageConcurrentlyService一并发消费
消费服务构造函数提交消费请求消费执行
序言:PullCallbackbroker 响应mqclient,执行pullCallback回调触发消息存储消费缓冲区processQueue触发消费线程消费消息
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
...... 删除其他代码
统计rt信息
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
如果消息体数量为0 则重新拉消息
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
增加统计信息
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
核心1: 将拉取到的32条消息添加到processQueue的msgTreeMap
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
核心2: 将拉取到的32条消息构建consumerequest 给消费线程消费
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
...... 交给PullRequestService重新拉取
}
...... 删除其他代码
}
};
消费原理图
ConsumeMessageOrderlyService一顺序消费
消费服务构造函数
虽然是Order顺序消费,但依旧是多线程通过锁保障顺序性
class ConsumeRequest implements Runnable {
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new linkedBlockingQueue();
this.consumeExecutor = new ThreadPoolExecutor(
// 默认20线程
this.defaultMQPushConsumer.getConsumeThreadMin(),
// 默认20线程
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
// 无界阻塞队列
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_")
// 默认异常策略
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}
提交消费请求
不同于并发消费,顺序消费的消息不是显示入参基于processQueue进行消息消费ConsumeRequest若存在则无需多次构建
public void submitConsumeRequest(
final List msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
只有加入processQueue内含有消息 并且没有已经在处理的ConsumeRequest
则构建新的ConsumeRequest提交
if (dispathToConsume) {
consumeExecutor消费直接基于processQueue消费消息
假设已经有老的ConsumeRequest在处理,则无需构建新的ConsumeRequest 因为新老processQueue相同,消息集合都位于processQueue,一个ConsumeRequest即可
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
消费执行
ConsumeMessageOrderlyService.ConsumeRequest
consumeExecutor线程池执行ConsumeRequest.run方法
虽然多线程,但依据submitConsumeRequest,一般也就单线程执行
顺序性基于锁而非线程数保障
基于processQueue进行顺序消费获取分页大小根据分页大小获取offset有序的消息集合前置钩子处理基于消息队列messagequeue一processqueue处理队列维度加锁消费后置钩子处理处理重试逻辑[阻塞]
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
...... 删除其他代码
@Override
public void run() {
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
获取当前 messageQueue 的锁
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (
this.processQueue.isLocked() && !this.processQueue.isLockExpired()
)
) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
...... 删除其他代码
获取分页大小
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
一般默认一条
processQueue消息存储格式如下 TreeMap msgTreeMap
根据message_offset消息偏移量构建有序map,所以t获取的消息保障了偏移量从小到大有序性
List msgs = this.processQueue.takeMessags(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
...... 删除代码: 执行before hook
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
processQueue维度加锁
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
消费
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}
...... 删除其他代码
long consumeRT = System.currentTimeMillis() - beginTimestamp;
...... 删除其他代码以及后置钩子执行
rt时间统计
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
处理重试逻辑[阻塞]
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
稍后重消费
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
}
ConsumeMessageConcurrentlyService一并发消费
消费服务构造函数
consumeExecutor处理ConsumeRequest任务(Runnable)进行消息消息当拒绝异常,会通过submitConsumeRequestLater稍后提交消费线程池
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new linkedBlockingQueue();
消费线程池参数
this.consumeExecutor = new ThreadPoolExecutor(
默认20
this.defaultMQPushConsumer.getConsumeThreadMin(),
默认20
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
默认无界阻塞队列
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_")
默认异常策略
);
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
提交消费请求
交由consumeExecutor线程池进行消费基于拉取的msgs直接分页后提交消费线程池
public void submitConsumeRequest(
final List msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
// 根据配置的一次最多消费多少消费拉起下来的本地消息 一般默认就是一个
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
// 默认拉取32条但是一次只消费1条
for (int total = 0; total < msgs.size(); ) {
List msgThis = new ArrayList(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
消费执行
ConsumeMessageConcurrentlyService.ConsumeRequest
consumeExecutor线程池执行ConsumeRequest.run方法
直接对ConsumeRequest.msgs进行消费,无需处理顺序性再平衡或者其他相关策略,无需消费获取业务执行listener执行Before Hook执行消费逻辑 [一个分页合集,默认1条]执行After Hook根据ackIndex 处理重试逻辑
class ConsumeRequest implements Runnable {
private final List msgs;
// 拉取消息同时存储在processQueue.msgTreeMap
private final ProcessQueue processQueue;
// ProcessQueue拉取的消息对应的消息队列
private final MessageQueue messageQueue;
public ConsumeRequest(List msgs, ProcessQueue processQueue, MessageQueue messageQueue) {
this.msgs = msgs;
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}
public List getMsgs() {
return msgs;
}
public ProcessQueue getProcessQueue() {
return processQueue;
}
@Override
public void run() {
// 再平衡或者其他相关策略,无需消费
if (this.processQueue.isDropped()) {
log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
return;
}
// 获取业务执行listener
MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
ConsumeConcurrentlyStatus status = null;
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
ConsumeMessageContext consumeMessageContext = null;
// 执行Before Hook
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setProps(new HashMap());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// 消费前先执行钩子
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}
long beginTimestamp = System.currentTimeMillis();
boolean hasException = false;
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
}
}
// 执行消费逻辑 一个分页合集,默认1条
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
}
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
// 执行后置钩子
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 根据ackIndex 处理重试逻辑【retry_consume_topic】
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
public MessageQueue getMessageQueue() {
return messageQueue;
}
}



