栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

精通RocketMQ系列:含泪1.5万字深度剖析RocketMQ消费者start启动流程源码

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

精通RocketMQ系列:含泪1.5万字深度剖析RocketMQ消费者start启动流程源码

一、概述

RocketMQ的消息消费包含两种模式:推push和拉pull。对于拉模式官方已经不推荐使用,所以我们主要介绍推模式。
特别说明:本文的源码基于RocketMQ4.8。

二、Push模式启动流程 1、consumer代码片段
package com.example.demo.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;


public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer");
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // topic , 过滤器 * 表示不过滤
        consumer.subscribe("saint-study-topic", "*");
        consumer.setConsumeTimeout(20L);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 消息传播模式
        consumer.setMessageModel(MessageModel.CLUSTERING);

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                // ack机制
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer start。。。。。。");
    }
}
2、确定启动流程入口

在consumer.start()行我们F7进入方法发现它所有的逻辑都是DefaultMQPushConsumerImpl类中的start()方法中做的,从这里我们可以确定入口就是DefaultMQPushConsumerImpl#start()。


到这肯定有很多小机灵鬼要问了,这个traceDispatcher是干嘛用的?满脸黑人问号。

从注释中我们看出,它是用来异步传输数据的,默认情况下它是null,也就是说正常我们使用不到它,所以不需要专门花费过多经历去看它。

3、启动流程逻辑

接着上面,我们继续F7步入方法,可以看到此时consumer服务的状态处于CREATE_JUST,然后我们继续深入剖析一把start()方法的内部,拔开它的底裤。

秉持了广大网友的习惯,我们先把源码和相应注释贴出来,方便大家先了解一下。
其实大家看RocketMQ相对新点的版本会发现,注释就像是珍稀动物一下,那是真的少。可想而知在开源之前大家大部分都是中文注释,开源了中文注释指定不能留,外国人看不懂。所以自己啃吧,好在RocketMQ的设计上很贴近中国人的思维,没那么多设计模式;相对比较好理解。

1)DefaultMQPushConsumerImpl#start()方法
public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                // 0、将消费者服务状态预设置为 "启动失败"
                this.serviceState = ServiceState.START_FAILED;

                // 1、校验一堆配置,例如:consumerGroup配置规则、消息传播方式不能为null(默认为集群消费--CLUSTERING)、并发消费线程数量。
                this.checkConfig();

                // 2、copy订阅关系,监听重投队列%RETRY%TOPIC。
                this.copySubscription();

                // 3、如果消息传播方式是集群模式,将消费者实例的name 修改为PID
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                }

                // 4、初始化MQ客户端连接工厂,此处的MQClientManager使用了饿汉式单例模式
                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                // 5、 消息重新负载消费
                // 指定消费组
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                // 指定消息传播方式
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                // 队列分配算法,指定如何将消息队列分配给每个使用者客户端。
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                // 指定MQClient工厂
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

                // 6、指定Pull模型请求包装器
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                // 注册消息过滤钩子
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

                // 7、指定消费进度(偏移量)
                if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        // 广播模式offset保存在本地
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                            // 集群模式offset保存在服务器
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                }
                this.offsetStore.load();

                // 8、创建消费服务
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    // 顺序消费
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    // 并行消费
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }
                // 启动消费服务--定时任务
                this.consumeMessageService.start();

                // 向broker注册自己(consumer)
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();
                log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                // 将consumer的状态修改为 "运行中"
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }
        // 从nameServer中获取监听的topic路由信息,若变更则修改。
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        // 检查消费者是否注册到broker中
        this.mQClientFactory.checkClientInBroker();
        // 向所有broker发送心跳信息、并上传FilterClass的源文件
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        // 唤醒ReBalance服务线程
        this.mQClientFactory.rebalanceImmediately();
    }

我们刚启用一个consumer的时候,consumer客户端的状态是CREATE_JUST,在Switch case逻辑中,当serverState是CREATE_JUST时,会执行以下逻辑:

(1)将消费者服务状态预设置为 “启动失败”。这个操作相信很多看过JUC的源码的大佬都会记得,JUC坐着道格.李老爷子的编程习惯:先预置状态,后续逻辑成功直接提交,否者就回滚。

(2)然后我们真正进入RocketMQ的启动流程,第一步是很常规的校验操作,校验一堆配置,比如:consumerGroup配置规则、消息传播方式不能为null(默认为集群消费–CLUSTERING)、并发消费线程数量等。感兴趣的老哥可以自己跟进去,你会发现这个逻辑太像我们平时写的代码了。

(3)第二步:复制copy订阅关系,监听重投队列%RETRY%TOPIC。这一步对于我们整体consumer的启动流程来讲意义不大,所以不要专进入、不要专进入、不要专进入。重要的事说三遍。

(4)第三步:判断消息传播方式是否为集群模式,是就将消费者实例的name 修改为PID。

(5)第四步:初始化MQ客户端连接工厂–MQClientManagerFactory,进而初始化MQClient,此处的MQClientManager使用了饿汉式单例模式。MQClientInstance封装了 RocketMQ 网络处理 API,是消息生产者、消息消费者与 NameServer、Broker 打交道的网络通道。另外:同一个 JVM 中的不同消费者和不同生产者在启动时获取到的 MQClientInstance 实例都是同一个

(6)第五步:指定消息重新重新负载的相关配置,比如:消费组、消息传播方式、队列负载策略、MQClient工厂等。

(7)第六步:指定创建Pull模型请求包装器(PullAPIWrapper),它是拉取Broker消息的API操作包装器。

(8)第七步:指定消息消费进度OffsetStore对象,初始化消息消费进度。集群模式下消息消费进度offset保存在broker、广播模式下消息消费进度offset保存在client消费者端,即本地文件中。如果是广播模式,紧接着会从本磁盘中加载消费进度文件。

从这里我们可以看到本地的文件的命名规则为:RocketMQ运行目录 / MQClientInstance的ID / groupName / offsets.json

(9)第八步:根据是否为顺序消费创建ConsumeMessageOrderlyService实现或ConsumeMessageConcurrentlyService实现的不同ConsumeMessageService对象并开启消费消息服务----这是个定时任务。consumeMessageService主要负责消息消费,内部维护一个线程池,可以通过参数配置最大和最小核心线程数、注意的它的阻塞队列是无界的。

(10)第九步:consumer向broker注册自己,注册失败则将consumer服务实例的状态回滚到CREATE_JUST,并将已经启动的消费消息的定时任务取消。否则将consumer的状态修改为RUNNING,并启动MQClientInstance。

嚯,启动MQClientInstance都干了什么呢?

卧槽,居然有注释。我们看一下它的意思:

public void start() throws MQClientException {

    synchronized (this) {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                // 如果nameserver地址为空,会去`http:// + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP`获取,
                // WS_DOMAIN_NAME由配置参数rocketmq.namesrv.domain设置,WS_DOMAIN_SUBG由配置参数rocketmq.namesrv.domain.subgroup设置
                if (null == this.clientConfig.getNamesrvAddr()) {
                    this.mQClientAPIImpl.fetchNameServerAddr();
                }
                // 开启请求和响应通道,即远程通信服务,生产者和消费者客户端处理消息发送和消费的API。
                this.mQClientAPIImpl.start();
                
                this.startScheduledTask();
                // 启动消息拉取服务
                this.pullMessageService.start();
                // 启动负载均衡服务
                this.rebalanceService.start();
                // 启动producer消息推送服务
                this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                log.info("the client factory [{}] start OK", this.clientId);
                this.serviceState = ServiceState.RUNNING;
                break;
            case START_FAILED:
                throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
            default:
                break;
        }
    }
}
  1. 开启MQClientAPIImpl远程通信服务,生产者和消费者客户端处理消息发送和消费的API。
  2. 开启各种各样的定时任务,比如定时拉取最新的nameServer、broker、topic信息,向broker发送心跳,持久化offset和调整线程池数量等。
  3. 开启从Broker拉取消息服务,供消费端消息消费。
  4. 开启消费者和消费队列关于消息消费的负载均衡服务。

这地方展开了说会很多,我们后面专门聊一下broker相关的内容。

可能大家会困惑这个serviceState的状态不是修改为了START_FAILED吗?这边不就抛异常直接退出了!!!!注意这里的serviceState是MQClientInstance自己的,而不是上文说的DefaultMQPushConsumerImpl中的那个serviceState。

后面无论consumer的状态是什么都会执行:

(11)第十步:从namesrv获取topic路由信息,若变更则修改。


(12)第十一步:向broker端校验客户端,检查client是否注册到broker。


(13)第十二步:向所有broker发送心跳信息、并上传FilterClass的源文件给FilterServer。


(14)第十二步:唤醒ReBalance服务线程,立即负载队列。

对于(11)—(14)步的详细介绍我们放在下一篇消费者subscribe流程中介绍。

三、Pull模式启动流程

Pull模式的启动流程主要体现在DefaultMQPullConsumerImpl类中,下面我们贴出其start()方法:

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;

            this.checkConfig();

            this.copySubscription();

            if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                this.defaultMQPullConsumer.changeInstanceNameToPID();
            }

            this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);

            this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());
            this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());
            this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());
            this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);

            this.pullAPIWrapper = new PullAPIWrapper(
                mQClientFactory,
                this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());
            this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

            if (this.defaultMQPullConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();
            } else {
                switch (this.defaultMQPullConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);
            }

            this.offsetStore.load();

            boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;

                throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()
                    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                    null);
            }

            mQClientFactory.start();
            log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The PullConsumer service state not OK, maybe started once, "
                + this.serviceState
                + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                null);
        default:
            break;
    }

}

从代码上来看,几乎和DefaultMQPushConsumerImpl类的start()方法一样,区别在于DefaultMQPullConsumerImpl类的start()方法中在switch case逻辑后多了如下片段:

// 从nameServer中获取监听的topic路由信息,若变更则修改。
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 检查消费者是否注册到broker中
this.mQClientFactory.checkClientInBroker();
// 向所有broker发送心跳信息、并上传FilterClass的源文件
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒ReBalance服务线程
this.mQClientFactory.rebalanceImmediately();
四、总结 1、几个关键类的作用

DefaultMQPushConsumerImpl是供客户端进行消息消费的,它创建了ConsumeMessageService消息消费服务、消息进度保存对象OffsetStore,消息消费的监听器对象MessageListener。

MQClientInstance开启了请求和响应通道、即远程通信服务;开启了消息拉取服务PullMessageService、从Broker拉取消息;负载均衡服务RebalanceService,给consumer和消息队列做负载均衡。

另外:DefaultMQPushConsumerImpl和MQClientInstance都是部署在客户端的;像从Broker拉取消息,消息队列的负载均衡都是在客户端完成的。

2、consumer启动流程关键点

主要就是检查配置参数;
获取MQClientInstance;
给重新负载服务设置消费组、消息传播模式、负载策略等属性,
创建pullAPIWrapper采用长轮询的方式拉取消息;
根据消息传播方式加载offsetStore;
根据是否为顺序消费选择对应的ConsumerMessageService消费服务并启动;
启动MQClientInstance、给broker心跳、唤醒ReBalance服务线程–立即负载队列。

最后立个flag:流程图的补充,在2021年10月8号之前补上;时序图的补充,在2021年10月9号之前补上。
至此国庆欠了四天的课,嗨皮完回归学习状态。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/294400.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号