Broker作为服务端接收请求的流程如下图:
接收到请求之后,我们着重看一下NettyRemotingAbstract#processRequestCommand()方法,看一下它是怎么处理客户端的请求的。
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
// 根据Command的Code从processorTable中获取相应的事件处理器和线程池
final Pair matched = this.processorTable.get(cmd.getCode());
// 找不到事件处理器,则使用默认的处理器
final Pair pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
final RemotingResponseCallback callback = new RemotingResponseCallback() {
@Override
public void callback(RemotingCommand response) {
doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
// 从 pair 中拿到 Processor事件处理器
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor) pair.getObject1();
// 处理请求
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
log.error("process request exception", e);
log.error(cmd.toString());
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
RemotingHelper.exceptionSimpleDesc(e));
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
}
};
if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
return;
}
try {
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
} catch (RejectedExecutionException e) {
if ((System.currentTimeMillis() % 10000) == 0) {
log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
+ ", too many requests and system thread pool busy, RejectedExecutionException "
+ pair.getObject2().toString()
+ " request code: " + cmd.getCode());
}
if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[OVERLOAD]system busy, start flow control for a while");
response.setOpaque(opaque);
ctx.writeAndFlush(response);
}
}
} else {
String error = " request type " + cmd.getCode() + " not supported";
final RemotingCommand response =
RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
response.setOpaque(opaque);
ctx.writeAndFlush(response);
log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
}
}
因为ClientManageProcessor是AsyncNettyRequestProcessor的子类,所以会走asyncProcessRequest()方法异步处理请求。
在AsyncNettyRequestProcessor#asyncProcessRequest()中会调用子类的processRequest()方法实现:
Broker处理心跳也就在ClientManageProcessor#processRequest()中:
上面提到了Broker处理心跳是在ClientManageProcessor#heartBeat()方法中处理的,下面我们来看一下heartBeat()方法:
public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
ctx.channel(),
heartbeatData.getClientID(),
request.getLanguage(),
request.getVersion()
);
// 处理心跳包中的消费者信息
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
// 获取Broker端的消费组订阅信息
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
data.getGroupName());
boolean isNotifyConsumerIdsChangedEnable = true;
// 如果Broker端的消费组订阅信息不为空,说明当前可能要修改消费者订阅信息
if (null != subscriptionGroupConfig) {
// 是否通知到所有的消费者 订阅信息变更
isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
int topicSysFlag = 0;
if (data.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
}
// 消费失败后的,消息消费重试队列、名为:%RETRY%groupName
String newTopic = MixAll.getRetryTopic(data.getGroupName());
// 创建消息消费重试队列
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
}
// 注册消费者订阅信息到Broker中,并判断订阅信息是否变更
boolean changed = this.brokerController.getConsumerManager().registerConsumer(
data.getGroupName(),
clientChannelInfo,
data.getConsumeType(),
data.getMessageModel(),
data.getConsumeFromWhere(),
data.getSubscriptionDataSet(),
isNotifyConsumerIdsChangedEnable
);
// 消费者信息发生变更,则打印日志记录
if (changed) {
log.info("registerConsumer info changed {} {}",
data.toString(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel())
);
}
}
// 处理心跳包中的生产者信息
for (ProducerData data : heartbeatData.getProducerDataSet()) {
// 直接注册producer,并把Producer的ClientChannelInfo保存下来、用于后面与Producer通信
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
clientChannelInfo);
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
对于Producer的心跳信息处理非常简单,直接注册Producer、并把Producer的ClientChannelInfo保存下来、用于后面与Producer通信;
对于Consumer的心跳信息处理稍微复杂一点:
1、先判断Broker端中是否存在消费者的订阅信息:
1)如果存在,创建一个用于消息重试消费的topic(%RETRY%groupName)
2、注册消费者订阅信息到Broker中;
3、判断消费者订阅信息是否变更、消费者的clientChannel(所在机器IP+Port)是否变更;只要有一个变更,就会通知当前消费组下的所有消费者实例 消费组中有信息变更。
最后我们看一下ConusumerManager#registerConsumer()注册消费者信息的源码:
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
final Set subList, boolean isNotifyConsumerIdsChangedEnable) {
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
// 消费者初次上报订阅信息时
if (null == consumerGroupInfo) {
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}
// 消费者的clientChannel(机器)是否变更
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
// 消费者的订阅信息是否变更
boolean r2 = consumerGroupInfo.updateSubscription(subList);
// 若消费者的机器变更 或 订阅信息变更,则通知所有的消费者实例
if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
// 消费者信息发生变更,则返回true
return r1 || r2;
}
源码注释出处
以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0



