问题点broker接收流程图消息接收
1.启动入口`NettyRemotingServer.start()`2.创建NettyServer通信通道3.Netty接收核心处理类`NettyServerHandler`
3.1 请求处理`processRequestCommand`3.2 消息请求处理器`SendMessageProcessor`3.3 消息存储`DefaultMessageStore`
问题点1.Producer发送消息之后Broker是如何接收?
2.Broker接收消息之后是如何处理的?
我们在之前的RocketMQ源码解析-Broker部分之Broker启动过程文章中分析过,broker的启动会调用BrokerStartup.start(),另外rocketmq是采用netty进行底层通信,所以broker也是通过netty接收消息,并进行消息处理。
1.启动入口NettyRemotingServer.start()其实NettyRemotingServer.start的启动是在BrokerStartup.start()进行调用的。
public void start() throws Exception {
if (this.messageStore != null) {
this.messageStore.start();
}
if (this.remotingServer != null) {
//NettyRemotingServer.start()
this.remotingServer.start();
}
}
2.创建NettyServer通信通道
@Override
public void start() {
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
nettyServerConfig.getServerWorkerThreads(),
new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
}
});
prepareSharableHandlers();
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
connectionManageHandler,
serverHandler
);
}
});
if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
}
try {
ChannelFuture sync = this.serverBootstrap.bind().sync();
InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
this.port = addr.getPort();
} catch (InterruptedException e1) {
throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
}
if (this.channelEventListener != null) {
this.nettyEventExecutor.start();
}
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);
}
prepareSharableHandlers
private void prepareSharableHandlers() {
handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
encoder = new NettyEncoder();
connectionManageHandler = new NettyConnectManageHandler();
serverHandler = new NettyServerHandler();
}
3.Netty接收核心处理类NettyServerHandler
NettyServerHandler底层实现了Netty框架的ChannelHandler,针对producer端发送过来的消息进行了拦截处理,想深入了解可以自学一下Netty框架,下面我们重点分析一下NettyServerHandler的处理流程。
@ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
//处理消息接收
processMessageReceived(ctx, msg);
}
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
final RemotingCommand cmd = msg;
if (cmd != null) {
switch (cmd.getType()) {
//处理请求的命令
case REQUEST_COMMAND:
processRequestCommand(ctx, cmd);
break;
case RESPONSE_COMMAND:
processResponseCommand(ctx, cmd);
break;
default:
break;
}
}
}
3.1 请求处理processRequestCommand
该方法主要是对brokerController.start()时通过registerProcessor注册的事件管理映射对象processTable对应事件的事件处理逻辑
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
//根据请求code获取事件处理器、线程执行器,这里主要获取SendMessageProcessor
final Pair matched = this.processorTable.get(cmd.getCode());
final Pair pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
//获取远程地址
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
//事前钩子
doBeforeRpcHooks(remoteAddr, cmd);
final RemotingResponseCallback callback = response -> {
doAfterRpcHooks(remoteAddr, cmd, response);
if (!cmd.isOnewayRPC()) {
if (response != null) {
response.setOpaque(opaque);
response.markResponseType();
try {
ctx.writeAndFlush(response);
} catch (Throwable e) {
log.error("process request over, but response failed", e);
log.error(cmd.toString());
log.error(response.toString());
}
} else {
}
}
};
if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
//异步处理请求
processor.asyncProcessRequest(ctx, cmd, callback);
} else {
NettyRequestProcessor processor = pair.getObject1();
//同步处理请求,SendMessageProcessor
RemotingCommand response = processor.processRequest(ctx, cmd);
callback.callback(response);
}
} catch (Throwable e) {
}
}
}
3.2 消息请求处理器SendMessageProcessor
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = null;
try {
response = asyncProcessRequest(ctx, request).get();
} catch (InterruptedException | ExecutionException e) {
log.error("process SendMessage error, request : " + request.toString(), e);
}
return response;
}
public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final SendMessageContext mqtraceContext;
switch (request.getCode()) {
// 消费者发送的重试消息
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
// 生产者发送的普通消息
default:
//根据请求组装RequestHeader对象
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null) {
return CompletableFuture.completedFuture(null);
}
mqtraceContext = buildMsgContext(ctx, requestHeader);
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch()) {
// 处理批量消息
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
} else {
// 处理单个消息请求
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
我们重点分析一下单条消息的处理逻辑
private CompletableFuture3.3 消息存储DefaultMessageStoreasyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request, SendMessageContext mqtraceContext, SendMessageRequestHeader requestHeader) { //初始化响应数据 final RemotingCommand response = preSend(ctx, request, requestHeader); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); if (response.getCode() != -1) { return CompletableFuture.completedFuture(response); } //获取消息内容body final byte[] body = request.getBody(); //消息发送的队列id int queueIdInt = requestHeader.getQueueId(); //消息发送的topic信息 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); //如果队列id<0,随机一个queueId if (queueIdInt < 0) { queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); } //封装信息到MessageExtBrokerInner中 MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); //处理重试和死信队列 if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return CompletableFuture.completedFuture(response); } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); Map origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); MessageAccessor.setProperties(msgInner, origProps); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName(); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName); if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) { // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message. // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it. String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue); } else { msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); } CompletableFuture putMessageResult = null; String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); //如果是事务消息走事务处理逻辑 if (transFlag != null && Boolean.parseBoolean(transFlag)) { //是否可以走事务流程 if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return CompletableFuture.completedFuture(response); } putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { //普通消息则使用MessageStore将消息进行存储 putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner); } return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt); }
这里我们简单了解一下消息最终会通过commitLog进行存在,后面将会对消息存储模块进行学习和讲解。
@Override
public CompletableFuture asyncPutMessage(MessageExtBrokerInner msg) {
CompletableFuture putResultFuture = this.commitLog.asyncPutMessage(msg);
return putResultFuture;
}
大致总结一下:
- 初始化响应数据;获取要发送的队列和topic配置,如果queueId<0,则随机选择一个;构建消息类;处理重试和死信,对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入死信队列;如果是事务消息,则需要校验是否不允许发送事务消息;使用MessageStore组件将消息存储在本地文件,只存储CommitLog文件,ConsumerQueue文件和IndexFile文件会由后台线程异步存储;处理消息存储结果



