栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

《RocketMQ源码分析》Broker是如何处理心跳的?

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

《RocketMQ源码分析》Broker是如何处理心跳的?

一、Broker接收请求

Broker作为服务端接收请求的流程如下图:

接收到请求之后,我们着重看一下NettyRemotingAbstract#processRequestCommand()方法,看一下它是怎么处理客户端的请求的。

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    // 根据Command的Code从processorTable中获取相应的事件处理器和线程池
    final Pair matched = this.processorTable.get(cmd.getCode());

    // 找不到事件处理器,则使用默认的处理器
    final Pair pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    final RemotingResponseCallback callback = new RemotingResponseCallback() {
                        @Override
                        public void callback(RemotingCommand response) {
                            doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                            if (!cmd.isOnewayRPC()) {
                                if (response != null) {
                                    response.setOpaque(opaque);
                                    response.markResponseType();
                                    try {
                                        ctx.writeAndFlush(response);
                                    } catch (Throwable e) {
                                        log.error("process request over, but response failed", e);
                                        log.error(cmd.toString());
                                        log.error(response.toString());
                                    }
                                } else {
                                }
                            }
                        }
                    };
                    if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {
                        // 从 pair 中拿到 Processor事件处理器
                        AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor) pair.getObject1();
                        // 处理请求
                        processor.asyncProcessRequest(ctx, cmd, callback);
                    } else {
                        NettyRequestProcessor processor = pair.getObject1();
                        RemotingCommand response = processor.processRequest(ctx, cmd);
                        callback.callback(response);
                    }
                } catch (Throwable e) {
                    log.error("process request exception", e);
                    log.error(cmd.toString());

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        final RemotingCommand response =
                RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

因为ClientManageProcessor是AsyncNettyRequestProcessor的子类,所以会走asyncProcessRequest()方法异步处理请求。

在AsyncNettyRequestProcessor#asyncProcessRequest()中会调用子类的processRequest()方法实现:

Broker处理心跳也就在ClientManageProcessor#processRequest()中:

二、Broker处理心跳

上面提到了Broker处理心跳是在ClientManageProcessor#heartBeat()方法中处理的,下面我们来看一下heartBeat()方法:

public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
    RemotingCommand response = RemotingCommand.createResponseCommand(null);
    HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
    ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
        ctx.channel(),
        heartbeatData.getClientID(),
        request.getLanguage(),
        request.getVersion()
    );

    // 处理心跳包中的消费者信息
    for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
        // 获取Broker端的消费组订阅信息
        SubscriptionGroupConfig subscriptionGroupConfig =
            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
                data.getGroupName());
        boolean isNotifyConsumerIdsChangedEnable = true;
        // 如果Broker端的消费组订阅信息不为空,说明当前可能要修改消费者订阅信息
        if (null != subscriptionGroupConfig) {
            // 是否通知到所有的消费者 订阅信息变更
            isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
            int topicSysFlag = 0;
            if (data.isUnitMode()) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            }
            // 消费失败后的,消息消费重试队列、名为:%RETRY%groupName
            String newTopic = MixAll.getRetryTopic(data.getGroupName());
            // 创建消息消费重试队列
            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
        }

        // 注册消费者订阅信息到Broker中,并判断订阅信息是否变更
        boolean changed = this.brokerController.getConsumerManager().registerConsumer(
            data.getGroupName(),
            clientChannelInfo,
            data.getConsumeType(),
            data.getMessageModel(),
            data.getConsumeFromWhere(),
            data.getSubscriptionDataSet(),
            isNotifyConsumerIdsChangedEnable
        );

        // 消费者信息发生变更,则打印日志记录
        if (changed) {
            log.info("registerConsumer info changed {} {}",
                data.toString(),
                RemotingHelper.parseChannelRemoteAddr(ctx.channel())
            );
        }
    }

    // 处理心跳包中的生产者信息
    for (ProducerData data : heartbeatData.getProducerDataSet()) {
        // 直接注册producer,并把Producer的ClientChannelInfo保存下来、用于后面与Producer通信
        this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
            clientChannelInfo);
    }
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

对于Producer的心跳信息处理非常简单,直接注册Producer、并把Producer的ClientChannelInfo保存下来、用于后面与Producer通信;

对于Consumer的心跳信息处理稍微复杂一点:

1、先判断Broker端中是否存在消费者的订阅信息:

1)如果存在,创建一个用于消息重试消费的topic(%RETRY%groupName)

2、注册消费者订阅信息到Broker中;
3、判断消费者订阅信息是否变更、消费者的clientChannel(所在机器IP+Port)是否变更;

只要有一个变更,就会通知当前消费组下的所有消费者实例 消费组中有信息变更。

最后我们看一下ConusumerManager#registerConsumer()注册消费者信息的源码:

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    final Set subList, boolean isNotifyConsumerIdsChangedEnable) {

    ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
    // 消费者初次上报订阅信息时
    if (null == consumerGroupInfo) {
        ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
        ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
        consumerGroupInfo = prev != null ? prev : tmp;
    }

    // 消费者的clientChannel(机器)是否变更
    boolean r1 =
        consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
            consumeFromWhere);
    // 消费者的订阅信息是否变更
    boolean r2 = consumerGroupInfo.updateSubscription(subList);

    // 若消费者的机器变更 或 订阅信息变更,则通知所有的消费者实例
    if (r1 || r2) {
        if (isNotifyConsumerIdsChangedEnable) {
            this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
        }
    }

    this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

    // 消费者信息发生变更,则返回true
    return r1 || r2;
}
源码注释出处

以上所有分析相关的源码注释请见GitHub中的release-4.8.0分支:https://github.com/Saint9768/rocketmq/tree/rocketmq-all-4.8.0

你们的每一个赞和关注都是博主创作的动力

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/592377.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号