前面分析了消费者是如何处理消息的,consumer和broker是如何更新维护消费者消费进度的,
但是如何consumer消费发送异常,或者返回消息需要再次消费(ConsumeConcurrentlyStatus.RECONSUME_LATER)这时,该如何处理呢?
从org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
public void submitConsumeRequest(
final List msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
// 每次消费消息的数量,默认1条
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
// 直接提交到execurot
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
// 按照每次消费的数量,拆分消息列表
List msgThis = new ArrayList(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
// 将经过拆分之后的消息,封装成consumeRequest
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
// 提交失败,就把剩下的消息都放在这个批次中,等待下次提交
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
可以看到,如果提交到线程池失败,会调用submitConsumeRequestLater()
private void submitConsumeRequestLater(final ConsumeRequest consumeRequest
) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
ConsumeMessageConcurrentlyService.this.consumeExecutor.submit(consumeRequest);
}
}, 5000, TimeUnit.MILLISECONDS);
}
就是5s中之后再次提交到线程池中执行。
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService.ConsumeRequest#run
public void run() {
...
//默认是消费成功
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
try {
...
// 调用lister去消费消息,
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
}
// 消费耗时
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) {
if (hasException) {
// 消费时发生异常
returnType = ConsumeReturnType.EXCEPTION;
} else {
// 消费时没有发生异常, listener消费后返回空
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
// 消费超时,默认15分钟
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
// listener返回需要再次消费,转换成消费失败
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
// listener返回消费成功
returnType = ConsumeReturnType.SUCCESS;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}
// 消费异常或者listener返回空,转成需要再次消费
if (null == status) {
log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
ConsumeMessageConcurrentlyService.this.consumerGroup,
msgs,
messageQueue);
status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}
// 记录消费RT
ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//处理消费结果
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
} else {
log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
}
}
可以看出来,如果用户处理消息发生异常未处理,或者返回null, 这都会被处理成ConsumeConcurrentlyStatus.RECONSUME_LATER
public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest
) {
// 设置ackIndex,这个值在创建ConsumeConcurrentlyContext时默认为Integer.MAX_VALUE
int ackIndex = context.getAckIndex();
if (consumeRequest.getMsgs().isEmpty())
return;
switch (status) {
// 消费成功
case CONSUME_SUCCESS:
// 设置ackIndex=消息数-1,默认情况下=0
if (ackIndex >= consumeRequest.getMsgs().size()) {
ackIndex = consumeRequest.getMsgs().size() - 1;
}
...
break;
case RECONSUME_LATER:
// 需要再次消费的,设置ackIndex=-1,用于下面的重试
ackIndex = -1;
...
break;
default:
break;
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
...
break;
case CLUSTERING:
List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
// 将消费失败的消息,重新发送到broker,
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
// 将消息重新发送到broker
boolean result = this.sendMessageBack(msg, context);
if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}
// 发送到brker失败
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
// 会将这些消息包装起来,5s后,再次流转消费
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
default:
break;
}
// 从processQueue中删除这小批msg
// 如果删除这批 消息之后,processQueue为空了 ,说明拉取的这批消息都消费完了,就将offset=最大的消息offset+1
// 如果processQueue不为空,说明这批拉取的消息还没有消费完,offset=第一个消息的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
//
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新本地的消息进度,随后又定时任务同步到broker上
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
如果消费失败,就会调用sendMessageBack(),然后更新本地消费的offset,这里为什么要更新消费的offset呢,后面分析
public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
// 获取下次消费的延时,默认是0
int delayLevel = context.getDelayLevelWhenNextConsume();
// Wrap topic with namespace before sending back message.
// 将topic转成 namespace%topic
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
// 发送重试消息
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}
return false;
}
public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
// 获取broker的addr
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
// 发送重试消息
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}
--
public void consumerSendMessageBack(
final String addr,
final MessageExt msg,
final String consumerGroup,
final int delayLevel,
final long timeoutMillis,
final int maxConsumeRetryTimes
) throws RemotingException, MQBrokerException, InterruptedException {
ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
// 只发送一些基本信息,不发送消息体
// 等发送到broker中,在从commitlog中读取消息
requestHeader.setGroup(consumerGroup);
requestHeader.setOriginTopic(msg.getTopic());
requestHeader.setOffset(msg.getCommitLogOffset());
requestHeader.setDelayLevel(delayLevel);
requestHeader.setOriginMsgId(msg.getMsgId());
requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
// 同步发送消息
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
}
将消息的一些主要信息,构造成一个RequestCode.CONSUMER_SEND_MSG_BACK类型的消息,发送到broker
--------
broker处理CONSUMER_SEND_MSG_BACK消息:SendMessageProcessor
SendMessageProcessor#asyncProcessRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand)
public CompletableFutureasyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final SendMessageContext mqtraceContext; switch (request.getCode()) { // consumer消费消息失败 case RequestCode.CONSUMER_SEND_MSG_BACK: return this.asyncConsumerSendMsgBack(ctx, request); ... } ... }
--
private CompletableFutureasyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ConsumerSendMsgBackRequestHeader requestHeader = (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); String namespace = NamespaceUtil.getNamespaceFromResource(requestHeader.getGroup()); //一些检查 ... // 重试队列<=0,说明不支持重试 if (subscriptionGroupConfig.getRetryQueueNums() <= 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return CompletableFuture.completedFuture(response); } //重试队列名称:"%RETRY%"+groupName String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); // 随机选取一个queue int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); int topicSysFlag = 0; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } // 获取或者创建重试队列(并向所有namesrv注册) // 重试队列在consumer启动,发送心跳的过程中就建立了 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); // 重试队列创建失败及一些检查 ... // 重试消息中只是消息的部分消息,从commitLog中读取完整的消息 MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); return CompletableFuture.completedFuture(response); } // 这里应该是保存原始的topic名称,保存在properties中,key是RETRY_TOPIC final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC); if (null == retryTopic) { MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic()); } msgExt.setWaitStoreMsgOK(false); // 延时的级别 int delayLevel = requestHeader.getDelayLevel(); // 最大重试次数 int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); // 版本大于3.4.9的,可以在消费端自定义最大重试次数 if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { Integer times = requestHeader.getMaxReconsumeTimes(); if (times != null) { maxReconsumeTimes = times; } } // 已经到达最大重试次数 | 延时级别 < 0 // 消息就不能在消费了,需要进入到死信队列,改变主题为 "%DLQ%"+groupName,并创建主题 // 该主题的权限为只写,说明消息一旦进入该主题队列,则消费者无法消费到,需要人工干预 if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { //死信队列 newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); // 死信队列的id queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; // 创建死信队列 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return CompletableFuture.completedFuture(response); } } else { // 如果延时级别=0,就根据重试消费的次数+3来确定延时级别 if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel); } // 构建msgInner MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); // 设置新的topic msgInner.setTopic(newTopic); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); // 设置重试消费的次数 msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); // 原始的消息id String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); // 将消息再次put一遍,但是topic变成了延时队列 // 向文件中写入该新设置的消息,如果是延时消息,则会再将主题改为 SCHEDULE_TOPIC_XXXX CompletableFuture putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); return putMessageResult.thenApply((r) -> { ... }); }
将消息的原始topic使用属性PROPERTY_RETRY_TOPIC保存,然后将topic设置为%RETRY%+groupName,
更新reconsumeTimes和delayLevel
if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel);msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
然后调用messageStore().asyncPutMessage()
DefaultMessageStore.putMessage() --> asyncPutMessage() --> commitLog.asyncPutMessage()
public CompletableFutureasyncPutMessage(final MessageExtBrokerInner msg) { ... // 事务类型 final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 不是事务,或者是事务提交 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery // 延时消息 if (msg.getDelayTimeLevel() > 0) { // 校验最大的延时级别 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 再次修改topic和queueId // topic:SCHEDULE_TOPIC_XXXX // queueId : delayTimeLevel-1 topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId // 保存原始的topic和queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } ... }
到这里delayLevel=3,会使用属性PROPERTY_REAL_TOPIC保存topic(%RETRY%+groupName),PROPERTY_REAL_QUEUE_ID保存queueId,
然后将topic更新为SCHEDULE_TOPIC_XXXX,将queueId更新为delayTimeLevel-1,
然后消息就会被写入commitlog中,
其实是将消息转存到延时队列中,下节来分析延时队列



