栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RocketMQ 源码分析—Broker启动流程与注册原理

RocketMQ 源码分析—Broker启动流程与注册原理

文章较长,建议先收藏再看

文章目录
  • 一、创建 BrokerController
    • 1.1 初始化核心配置类
    • 1.2.加载配置文件信息到配置类
    • 1.3 设置 masterId
    • 1.4 创建BrokerController
  • 二、BrokerController初始化
    • 2.1 Broker初始化过程
      • 2.1.1 加载 topic、consumer 等磁盘配置信息
      • 2.1.2 消息存储组件DefaultMessageStore
        • 初始化消息存储组件
        • 加载本地消息信息
      • 2.1.3 创建 Netty 网络组件NettyRemotingServer
      • 2.1.4 初始化Broker 端各类线程池
      • 2.1.5 注册各类业务处理器
      • 2.1.6 开启定时任务
      • 2.1.7 初始化事务、ACL 和 RpcHooks
  • 三、BrokerController 启动
    • 3.1 启动消息存储组件
      • 3.1.1 启动 commitLog 分发
      • 3.1.2 启动HA 高可用和延迟消息服务
      • 3.1.3 文件刷盘
      • 3.1.4 创建 abort 文件
      • 3.1.5 删除过期commitLog 和 consumeQueue任务
    • 3.2 注册 Broker ★
      • 构造请求参数
      • 发送Netty网络请求
      • NameServer 处理注册 Broker 请求
    • 注册 Broker 条件
  • 总结

上一篇文章中介绍了 NameServer 的启动流程,并介绍了 NameServer 启动涉及到的几个核心点:

  • 加载NameServer 配置类和 NettyServer配置类,创建核心组件 NamesrcController
  • Controller 内初始化各个数据,包括 Netty 网络处理类、工作线程池、路由管理器、两个定时任务,一个用于每10s扫描不活跃的 Broker 从 NameServer 端移除,一个每10min 打印一次 KV 配置
  • 路由管理器中维护了 Broker、集群、topic 队列等元数据信息
  • 还有一个网络请求处理器,专门处理NameServer 端收到的各类网络请求
  • 以及一个优雅关闭线程池的钩子函数

这一节将围绕 Broker 端的启动流程进行讲解。

在梳理源码前,先大概思考下 Broker 启动时会做什么,结合 NameServer 启动流程,可以猜测:

  • 加载 Broker 端的配置;
  • 初始化 Broker 端的一些基础组件和线程池
  • Broker 启动后需要将自己注册到 NameServer,因此会向 NameServer 发送一个网络请求;
  • Broker 会每30s发送心跳给 NameServer,并把自己的信息传递给 NameServer,维护在路由管理器中;

能想到的暂时就这些了,那么一起来看看吧。

如果想快速了解启动流程,可以直接看文末的总结部分。

Broker 的启动类为org.apache.rocketmq.broker.BrokerStartup

public static void main(String[] args) {
        start(createBrokerController(args));
    }

    public static BrokerController start(BrokerController controller) {
            //K1 Controller启动
            controller.start();
          	 .....
            return controller;
    }

和 NameServer 一样,也是分两大步,第一步创建 BrokerController,第二步调用 Controller 的启动方法

一、创建 BrokerController

该部分主要完成了 Broker 端的各类配置的加载和基础组件、线程池的创建工作。

1.1 初始化核心配置类
				 final BrokerConfig brokerConfig = new BrokerConfig();
            final NettyServerConfig nettyServerConfig = new NettyServerConfig();
            final NettyClientConfig nettyClientConfig = new NettyClientConfig();
            //TLS加密相关
           nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
            //Netty服务端的监听端口10911
            nettyServerConfig.setListenPort(10911);
            //K2 这个明显是Broker用来存储消息的一些配置信息。
            final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
            //如果是SLAVE,会设置一个参数。
            if (BrokerRole.SLAVE == messageStoreConfig.getBrokerRole()) {
                int ratio = messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;
               // 该参数为消息在内存中的最大比率,主从同步会用到,具体可以去官网了解下
                messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);
            }

可以看到,Broker 端也有三个核心配置类:

  • BrokerConfig

    保存了 Broker 的基础信息,如 ip、name、集群名等,以及 Broker 端各种线程池的默认线程数等等非常多,这些大家可以自己去源码中看一下;

  • NettyServerConfig

    保存 Nerrty 服务端的一些网络配置,如工作线程数、监听端口等,和 NameServer 端的对应的 Netty 配置类差不多;

  • NettyClientConfig

    Netty 客户端相关的网络配置信息。如客户端的工作线程数、连接超过时间等等

  • MessageStoreConfig

    Broker 存储消息的配置类。定义了默认的存储路径(user.home/store)、commitLog 路径以及其他与存储相关的默认配置;另外消息延迟时间等价也在这里定义了:

        private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
1.2.加载配置文件信息到配置类

接着就是解析命令行参数,如我们 Broker 启动时会通过-c conf/broker.conf指定配置文件启动:

 if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c');
                if (file != null) {
                    configFile = file;
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);

                    properties2SystemEnv(properties);
                    MixAll.properties2Object(properties, brokerConfig);
                    MixAll.properties2Object(properties, nettyServerConfig);
                    MixAll.properties2Object(properties, nettyClientConfig);
                    MixAll.properties2Object(properties, messageStoreConfig);

                    BrokerPathConfigHelper.setBrokerConfigPath(file);
                    in.close();
                }
            }

这里就会解析配置文件内容,将对应的配置加载到对应的几个配置类,可以简单看下加载完的几个配置类内容:

BrokerConfig 的:

如 NameServer 地址、ip、名字为 broker-a、集群名、brokerId、默认 topic 下的队列数量为4、开启自动创建主题还有一堆没有截图;

存储相关:

我们修改了默认的存储路径,改到了当前源码的主目录:

1.3 设置 masterId

这里可以看到,其会根据 broker 的角色,设置当前 Broker 的 brokerId,MASTER 的 brokerId=0,SLAVE 的 brokerId 必须大于0:

  switch (messageStoreConfig.getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    brokerConfig.setBrokerId(MixAll.MASTER_ID);
                    break;
                case SLAVE:
                    if (brokerConfig.getBrokerId() <= 0) {
                        System.out.printf("Slave's brokerId must be > 0");
                        System.exit(-3);
                    }

                    break;
                default:
                    break;
            }
 if (messageStoreConfig.isEnableDLegerCommitLog()) {
                brokerConfig.setBrokerId(-1);
            }

另外,如果开启了 Dledger 技术,则 brokerId 默认为-1

接着是日志相关的配置就不看了。

1.4 创建BrokerController

这里首先会将上面的四个核心配置保存到 BrokerController,然后会进行一些 Broker 端的一些基本组件的初始化工作:

  public BrokerController(
        final BrokerConfig brokerConfig,
        final NettyServerConfig nettyServerConfig,
        final NettyClientConfig nettyClientConfig,
        final MessageStoreConfig messageStoreConfig
    ) {
        //四个核心组件保存起来。
        this.brokerConfig = brokerConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.nettyClientConfig = nettyClientConfig;
        this.messageStoreConfig = messageStoreConfig;
        //Broker的各种功能对应的组件。
        //管理consumer消费offset
        this.consumerOffsetManager = new ConsumerOffsetManager(this);
        //Topic配置管理器
        this.topicConfigManager = new TopicConfigManager(this);
        //处理Consumer拉取消息的请求的
        this.pullMessageProcessor = new PullMessageProcessor(this);
     		//推送模式相关组件
        this.pullRequestHoldService = new PullRequestHoldService(this);
     		//消息到达监听器(推模式下Broker 收到消息时,会主动推送消息给 consumer),
     	//	这里用到了pullRequestHoldService,也可以大概知道,RocketMQ 的推模式也是由拉模式来实现的
        this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);

        this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
        //消费者管理器
     		this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
        // 消费过滤相关的管理器(这里可以验证,RocketMQ 的消息过滤,是在 Broker 端实现的)
     	   this.consumerFilterManager = new ConsumerFilterManager(this);
     	//生产者管理器
        this.producerManager = new ProducerManager();
        this.clientHousekeepingService = new ClientHousekeepingService(this);
     	// 提供了检查事务状态的方法
        this.broker2Client = new Broker2Client(this);
        this.subscriptionGroupManager = new SubscriptionGroupManager(this);
     		//对外的 API,可以看成一个 Netty 客户端
        this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
     	// 用来过滤消息的 Server
        this.filterServerManager = new FilterServerManager(this);

        this.slaveSynchronize = new SlaveSynchronize(this);

     	  // 各类线程池队列,看名知意
     		// 发消息的队列
        this.sendThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getSendThreadPoolQueueCapacity());
        this.pullThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getPullThreadPoolQueueCapacity());
        this.replyThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getReplyThreadPoolQueueCapacity());
        this.queryThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getQueryThreadPoolQueueCapacity());
        this.clientManagerThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
        this.consumerManagerThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
     		//心跳的线程池队列
        this.heartbeatThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
        this.endTransactionThreadPoolQueue = new linkedBlockingQueue(this.brokerConfig.getEndTransactionPoolQueueCapacity());
        this.brokerStatsManager = new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());
        this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));
		// 快速失败机制相关
        this.brokerFastFailure = new BrokerFastFailure(this);
     
     	//保存全部的 Config
        this.configuration = new Configuration(
            log,
            BrokerPathConfigHelper.getBrokerConfigPath(),
            this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
        );
    }

看起来是不是特别多,仔细看一下可以分为三大部分:Broker 端的核心组件、Broker 端的各类线程池队列、以及最后的 Configuration 类,保存四个配置的配置信息,这里我们大概看下有哪些组件有个印象,后面涉及到具体的功能时再去看。

二、BrokerController初始化

还是要强调下,这里我们只关注 Broker 初始化的流程,做了哪些事这个主线,但是具体这些事是如何实现的暂时不关注,否则东拉西扯太混乱,仅对一些重要的点进行跟入,如 Broker 的注册、心跳发送等。

2.1 Broker初始化过程

入口:BrokerController#initialize()

2.1.1 加载 topic、consumer 等磁盘配置信息
 boolean result = this.topicConfigManager.load();
        result = result && this.consumerOffsetManager.load();
        result = result && this.subscriptionGroupManager.load();
        result = result && this.consumerFilterManager.load();

这几个方法其实加载的是 store/config 目录下的几个 json 配置信息到自己的 ConfigManager 管理器中

2.1.2 消息存储组件DefaultMessageStore 初始化消息存储组件

1.配置加载成功后会初始化消息存储管理器

  //消息存储管理组件,管理磁盘上的消息的。
                this.messageStore =
                    new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                        this.brokerConfig);
                //如果启用了Dledger
                if (messageStoreConfig.isEnableDLegerCommitLog()) {
                   //初始化一堆Dledger相关的组件
                  .....
                }
                //broker的统计组件
                this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                //load plugin
               ...........

DefaultMessageStore中初始化了各种和消息存储相关的服务组件:

	//消息存储相关配置    private final MessageStoreConfig messageStoreConfig;    //CommitLog    private final CommitLog commitLog;    //消息队列缓存    private final ConcurrentMap> consumeQueueTable;    //消息队列文件刷盘线程(继承了 Runnable 接口)    private final FlushConsumeQueueService flushConsumeQueueService;    //清除CommitLog文件服务    private final CleanCommitLogService cleanCommitLogService;    //清除ConsumeQueue文件服务    private final CleanConsumeQueueService cleanConsumeQueueService;    //索引服务    private final IndexService indexService;    //分配MappedFile服务    private final AllocateMappedFileService allocateMappedFileService;    //分发commitLog消息,根据CommitLog文件来构建ConsumeQueue和indexFile文件。    private final ReputMessageService reputMessageService;    //HA机制服务    private final HAService haService;    //延迟发送消息的服务    private final ScheduleMessageService scheduleMessageService;    //存储状态服务    private final StoreStatsService storeStatsService;    //消息堆外内存缓存    private final TransientStorePool transientStorePool;    private final RunningFlags runningFlags = new RunningFlags();    private final SystemClock systemClock = new SystemClock();	// 用于创建几个定时任务    private final ScheduledExecutorService scheduledExecutorService =        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("StoreScheduledThread"));    //Broker状态管理器    private final BrokerStatsManager brokerStatsManager;    // 就是前面的那个消息到达的监听器,注入进来的    private final MessageArrivingListener messageArrivingListener;    //Broker配置类    private final BrokerConfig brokerConfig;    //checkpoint 相关    private StoreCheckpoint storeCheckpoint;    //CommitLog文件转发请求    private final linkedList dispatcherList;	// 磁盘检查香瓜你的定时任务线程池    private final ScheduledExecutorService diskCheckScheduledExecutorService =            Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("DiskCheckScheduledThread"));
加载本地消息信息
result = result && this.messageStore.load();//org.apache.rocketmq.store.DefaultMessageStore#loadpublic boolean load() {        boolean result = true;        try {            //1.根据abort临时文件判断服务是否正常关闭。            boolean lastExitOK = !this.isTempFileExist();            //2.延迟消息相关服务和信息加载。            if (null != scheduleMessageService) {                result = result && this.scheduleMessageService.load();            }            //加载Commit Log            result = result && this.commitLog.load();            //加载Consume Queue            result = result && this.loadConsumeQueue();            if (result) {               	// checkpoint初始化                this.storeCheckpoint =                    new StoreCheckpoint(StorePathConfigHelper.getStoreCheckpoint(this.messageStoreConfig.getStorePathRootDir()));                this.indexService.load(lastExitOK);						//3.文件恢复                this.recover(lastExitOK);            }        } ......        return result;    }

从上面源码中可看到,主要做了以下几个事情:

  1. 根据store 下的 abort 文件是否存在,来判断上次是否正常关闭(不存在表示正常关闭)

  2. 初始化延迟消息服务相关的内容

  3. 加载本地的 commit_log 和 consume_queue 文件

    将本地的每个 commitlog 文件中的信息加载到MappedFile中;

    将本地的 consumequeue 文件数据保存到ConsumeQueue消费队列中,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。

  4. 加载本地的checkPoint文件

  5. 加载索引信息

    加载本地的 index 目录下的索引文件保存到IndexFile中,IndexFile提供了可以通过key或时间区间来查询消息的方法

  6. 文件恢复相关

    根据 commitlog 来处理,具体如何恢复暂不关注。

这里看下延迟消息相关:

//org.apache.rocketmq.store.schedule.ScheduleMessageService#load public boolean load() {        boolean result = super.load();        result = result && this.parseDelayLevel();        return result;    }

两件事:

  1. 加载本地的delayOffset.json文件

    加载后的数据保存到ScheduleMessageService的offsetTable中,保存了每个延迟等级下的消息offset 数据【延迟消息部分细讲】

     private final ConcurrentMap offsetTable =        new ConcurrentHashMap(32);
    
  2. 解析延迟等级

    将默认的几个延迟等级延迟的毫秒数计算出来,保存到delayLevelTable中:

    private final ConcurrentMap delayLevelTable =        new ConcurrentHashMap(32);
    
2.1.3 创建 Netty 网络组件NettyRemotingServer

初始化 Netty 服务端的通信组件和,和 NameServer 端一样。

2.1.4 初始化Broker 端各类线程池

都是初始化的工作,这里就不列源码了,线程池就是1.4中列举的几个线程队列对应的业务线程池,大概有以下几个线程池:

  • 发送消息
  • 处理 consumer拉消息
  • 回复消息
  • 查询消息
  • 处理命令行相关
  • 客户端管理
  • 心跳发送
  • 事务消息结束
  • consumer 管理
2.1.5 注册各类业务处理器

以发消息为例:

SendMessageProcessor sendProcessor = new SendMessageProcessor(this);        sendProcessor.registerSendMessageHook(sendMessageHookList);        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);     
  • SendMessageProcessor就是一个业务处理器,这里是发送消息的处理器

  • 将每个请求 Code 对应的处理器和处理线程池注册到NettyRemotingServer父类NettyRemotingAbstract的processorTable中,结构如下:

      protected final HashMap> processorTable =        new HashMap>(64);
    

这样在处理每个请求的时候,就可以根据 RequestCode拿到对应的处理器和线程池发送网络请求,这里也和 NameServer 端对应上了,NameServer 端会根据收到的请求的RequestCode进行不同的业务处理。

处理器的接口为RocketMQ 自己设计的NettyRequestProcessor

public interface NettyRequestProcessor {    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)        throws Exception;}
2.1.6 开启定时任务

因为都是正常的开启定时任务的代码this.scheduledExecutorService.scheduleAtFixedRate就不放源码了,这里开启的定时任务主要有以下几个:

  • broker 数据定时统计

  • consumerOffset 数据定时持久化到磁盘

  • consumerFilter 数据定时持久化到磁盘

  • broker 保护

  • 打印水位线

     public void printWaterMark() {        LOG_WATER_MARK.info("[WATERMARK] Send Queue Size: {} SlowTimeMills: {}", this.sendThreadPoolQueue.size(), headSlowTimeMills4SendThreadPoolQueue());        LOG_WATER_MARK.info("[WATERMARK] Pull Queue Size: {} SlowTimeMills: {}", this.pullThreadPoolQueue.size(), headSlowTimeMills4PullThreadPoolQueue());        LOG_WATER_MARK.info("[WATERMARK] Query Queue Size: {} SlowTimeMills: {}", this.queryThreadPoolQueue.size(), headSlowTimeMills4QueryThreadPoolQueue());        LOG_WATER_MARK.info("[WATERMARK] Transaction Queue Size: {} SlowTimeMills: {}", this.endTransactionThreadPoolQueue.size(), headSlowTimeMills4EndTransactionThreadPoolQueue());    }
    
  • 打印commitLog分发结果

    消息存储器里的ReputMessageService会根据 commitlog 文件构建逻辑消费队列 consumerQueue 和索引文件 indexFIle

2.1.7 初始化事务、ACL 和 RpcHooks
  				initialTransaction();            initialAcl();//权限相关            initialRpcHooks();//ACL相关的 RPC 钩子

我们主要看下和事务相关的初始化工作:

 private void initialTransaction() {        this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);        if (null == this.transactionalMessageService) {            this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));        }        this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);        if (null == this.transactionalMessageCheckListener) {            this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();        }        this.transactionalMessageCheckListener.setBrokerController(this);        this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);    }
  • 事务消息服务类TransactionalMessageService
  • 事务消息回查服务TransactionalMessageCheckService
  • 事务消息检查监听类AbstractTransactionalMessageCheckListener

可以看到,这里都是基于 SPI 机制加载对应的实现类,如果没有配置的话,则使用对应默认的实现类。

因此我们可以自己进行扩展,把实现类路由放到对应的接口文件中

    public static final String TRANSACTION_SERVICE_ID = "meta-INF/service/org.apache.rocketmq.broker.transaction.TransactionalMessageService";    public static final String TRANSACTION_LISTENER_ID = "meta-INF/service/org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener";
三、BrokerController 启动

org.apache.rocketmq.broker.BrokerController#start

这部分是将第二部分初始化的一些核心组件进行启动工作以及 Broker 的心跳注册任务。

(1)启动核心组件

 		//1.启动消息存储组件        if (this.messageStore != null) {            this.messageStore.start();        }        //启动了两个Netty服务器        if (this.remotingServer != null) {            this.remotingServer.start();        }        if (this.fastRemotingServer != null) {            this.fastRemotingServer.start();        }        //文件相关的服务【略】         if (this.fileWatchService != null) {            this.fileWatchService.start();        }        //2.Broker的一个Netty客户端。Broker 的注册就是在这里进行请求的        if (this.brokerOuterAPI != null) {            this.brokerOuterAPI.start();        }        					//拉取相关的服务启动        if (this.pullRequestHoldService != null) {            this.pullRequestHoldService.start();        }			        if (this.clientHousekeepingService != null) {            this.clientHousekeepingService.start();        }			//过滤器启动        if (this.filterServerManager != null) {            this.filterServerManager.start();        }

这里只看下消息存储组件启动时做了什么,其他的先不看。

3.1 启动消息存储组件

这里就是将消息存储组件中的几个核心服务进行启动,如延迟消息服务、commitLog、刷盘、还有一些和消息存储相关的定时任务,如定时删除过期消息、定时删除过期的 commitlog和 consumeQueue等。

3.1.1 启动 commitLog 分发
  //K2 Broker启动时会启动一个线程来更新ConsumerQueue索引文件。            this.reputMessageService.setReputFromOffset(maxPhysicalPosInLogicQueue);            this.reputMessageService.start();

start 中主要就是调用thread.start 方法,因此Broker 在启动的时候就在这开启了一个线程处理 commitLog 的分发(收到消息后根据 commitlog文件去构造ConsumeQueue 和IndexFile),具体处理在ReputMessageService的 run()方法,紧接着调用了 doReput方法:

 public void run() {            while (!this.isStopped()) {              ...                    this.doReput();			    ...        }

知道收到消息后会在这里进行分发即可,后面讲到发送消息的时候在看

3.1.2 启动HA 高可用和延迟消息服务
  if (!messageStoreConfig.isEnableDLegerCommitLog()) {            this.haService.start();            this.handleScheduleMessageService(messageStoreConfig.getBrokerRole());        }

this.haService.start();这里会启动 HA 高可用服务,后面讲到主备的时候再看。

 public void handleScheduleMessageService(final BrokerRole brokerRole) {        if (this.scheduleMessageService != null) {            if (brokerRole == BrokerRole.SLAVE) {                this.scheduleMessageService.shutdown();            } else {                this.scheduleMessageService.start();            }    }    }

这里是具体的启动延迟消息服务:

public void start() {   		//CAS 加锁        if (started.compareAndSet(false, true)) {            this.timer = new Timer("ScheduleMessageTimerThread", true);            for (Map.Entry entry : this.delayLevelTable.entrySet()) {                Integer level = entry.getKey();                Long timeDelay = entry.getValue();                Long offset = this.offsetTable.get(level);                if (null == offset) {                    offset = 0L;                }                if (timeDelay != null) {                    //开启定时去执行延迟消息处理任务                    this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);                }            }            //2.每隔10秒将延迟消息持久化到硬盘中。            this.timer.scheduleAtFixedRate(new TimerTask() {                @Override                public void run() {                    try {                        if (started.get()) ScheduleMessageService.this.persist();                    } catch (Throwable e) {                        log.error("scheduleAtFixedRate flush exception", e);                    }                }            }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());        }    }

两件事:

  1. 遍历延迟等级表,对于每个延迟等级,开启一个定时任务去处理该等级下的延迟队列的消息(从 offsetTable中取)【后面将延迟消息发送再细看】
  2. 再开个定时任务,每10s 将延迟消息刷新到磁盘,就是初始化消息存储组件时同步到offsetTable的store/config/delayOffset.json文件
3.1.3 文件刷盘
this.flushConsumeQueueService.start();this.commitLog.start();

consumeQueue 的刷盘处理位置:org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush

commitLog 的刷盘处理位置:org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run

这里会将内存中的commitlog 和 consumeQueue数据刷新到磁盘,具体等后面刷盘的文章讲。

3.1.4 创建 abort 文件
  this.storeStatsService.start();//存储相关统计数据线程开启        this.createTempFile();//创建 abort 文件
3.1.5 删除过期commitLog 和 consumeQueue任务
this.addScheduleTask();
    private void addScheduleTask() {        //定时删除过期消息        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {            @Override            public void run() {                DefaultMessageStore.this.cleanFilesPeriodically();            }        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);        .....           //下面还有一些数据检查的任务,就不看了    }

默认10s 删除一次过期消息。

private void cleanFilesPeriodically() {        //定时删除过期commitlog        this.cleanCommitLogService.run();        //定时删除过期的consumequeue        this.cleanConsumeQueueService.run();    }
3.2 注册 Broker ★
//会先调用一次注册,第三个参数为是否强制注册this.registerBrokerAll(true, false, true);//然后开启定时任务30s 发送一次心跳this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {            @Override            public void run() {                try {                   //这里每次发送心跳根据 broker 的配置决定是否强制注册                    BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());                } catch (Throwable e) {                    log.error("registerBrokerAll Exception", e);                }            }                    }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

默认30s 会发送一次心跳。

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {        //Topic配置        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();       ...        //如果位配置强制注册,则每次需要先判断是否需要注册        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),            this.getBrokerAddr(),            this.brokerConfig.getBrokerName(),            this.brokerConfig.getBrokerId(),            this.brokerConfig.getRegisterBrokerTimeoutMills())) {            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);        }    }

needRegister的逻辑最后再看,先看具体的注册业务:

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,        TopicConfigSerializeWrapper topicConfigWrapper) {        //具体的注册        List registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(...参数看后面的代码);        //如果注册结果的数量大于0,那么就对结果进行处理        if (registerBrokerResultList.size() > 0) {            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);            if (registerBrokerResult != null) {                //主节点地址                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());                }                //从节点地址                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());                if (checkOrderConfig) {                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());                }            }        }    }

通过brokerOuterAPI进行注册,返回注册结果列表(因为可能开启多个 NameServer)RegisterBrokerResult,然后对返回结果进行处理,更新地址信息。

主要看注册过程:

构造请求参数
 public List registerBrokerAll(    		//broker 相关参数        final String clusterName,        final String brokerAddr,        final String brokerName,        final long brokerId,        final String haServerAddr,        final TopicConfigSerializeWrapper topicConfigWrapper,        final List filterServerList,        final boolean oneway,        final int timeoutMills,        final boolean compressed) {        //存放每个NameServer注册结果        final List registerBrokerResultList = Lists.newArrayList();        //获取NameServer的地址列表        List nameServerAddressList = this.remotingClient.getNameServerAddressList();        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {            //Broker注册的请求参数            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();            requestHeader.setBrokerAddr(brokerAddr);//broker 地址            requestHeader.setBrokerId(brokerId);//brokerId            requestHeader.setBrokerName(brokerName);//brokerName            requestHeader.setClusterName(clusterName);//集群名            requestHeader.setHaServerAddr(haServerAddr);//Ha时slave地址            requestHeader.setCompressed(compressed);            //注册的请求体            RegisterBrokerBody requestBody = new RegisterBrokerBody();            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);            requestBody.setFilterServerList(filterServerList);            final byte[] body = requestBody.encode(compressed);//编码            final int bodyCrc32 = UtilAll.crc32(body);//使用循环冗余校验【可以回顾下计组知识哦】            requestHeader.setBodyCrc32(bodyCrc32);            for (final String namesrvAddr : nameServerAddressList) {                brokerOuterExecutor.execute(new Runnable() {                    @Override                    public void run() {                        try {                            //执行注册                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);                            if (result != null) {                               //保存注册结果                                registerBrokerResultList.add(result);                            }                        }...                    }                });            }          ...        }        return registerBrokerResultList;    }

首先遍历 NameServer 列表,然后构建请求体,绑定请求参数,最后进行注册:

终于来到了最终发送网络请求的地方了:

 private RegisterBrokerResult registerBroker(        final String namesrvAddr,        final boolean oneway,        final int timeoutMills,        final RegisterBrokerRequestHeader requestHeader,        final byte[] body    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,        InterruptedException {        //1.构建远端的网络请求,这里传入了请求类型REGISTER_BROKER        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);        request.setBody(body);        ......        //2.Netty客户端发送网络请求        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);        //封装网络请求结果。        assert response != null;        switch (response.getCode()) {            case ResponseCode.SUCCESS: {                RegisterBrokerResponseHeader responseHeader =                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);               //3.封装返回结果                RegisterBrokerResult result = new RegisterBrokerResult();                result.setMasterAddr(responseHeader.getMasterAddr());                result.setHaServerAddr(responseHeader.getHaServerAddr());                if (response.getBody() != null) {                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));                }                return result;            }            default:                break;        }        //不成功抛异常        throw new MQBrokerException(response.getCode(), response.getRemark());    }

注:

  • 构建的请求 Command 传入了请求 Code:RequestCode.REGISTER_BROKER,NaemServer 端就根据该 Code 进行此类请求的处理

看一下 Netty 客户端发送请求的代码:

发送Netty网络请求
//org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)        throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {        long beginStartTime = System.currentTimeMillis();        //1.创建与 NameServer 的连接返回的channel        final Channel channel = this.getAndCreateChannel(addr);        if (channel != null && channel.isActive()) {//channel 活跃的            try {                .....                //2.发送网络请求                RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);               ....                return response;            }            .....        } else {         .....        }    }
  1. 建立与 NameServer 的连接,返回 Channel 对象

    其实这是Netty 固定的连接建立流程,就不深入了,可以自己去学习下,但是这里会缓存每个 NameServer 的连接:

     private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException {        if (null == addr) {            return getAndCreateNameserverChannel();        }    	  //从本地缓存拿到与该 NameServer 的连接对象        ChannelWrapper cw = this.channelTables.get(addr);        if (cw != null && cw.isOK()) {            return cw.getChannel();        }        return this.createChannel(addr);    }
    
  2. 通过channel发送网络请求【Netty 发送请求的标准流程,可以不看该部分】

        public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,        final long timeoutMillis)        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {        final int opaque = request.getOpaque();        try {           //1.创建ResponseFuture,在回调中设置返回结果【类型 Java 的 Future 任务】            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);           //保存当前请求的请求 id 和响应结果对象            this.responseTable.put(opaque, responseFuture);            final SocketAddress addr = channel.remoteAddress();            //2.发送请求            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture f) throws Exception {                    if (f.isSuccess()) {                       //监听发送结果,发送成功或失败会触发此回调                        responseFuture.setSendRequestOK(true);                        return;                    } else {                        responseFuture.setSendRequestOK(false);                    }						//移除此次请求记录                    responseTable.remove(opaque);                    responseFuture.setCause(f.cause());                    responseFuture.putResponse(null);                    log.warn("send a request command to channel <" + addr + "> failed.");                }            });            //阻塞等待NameServer 注册结果            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);            if (null == responseCommand) {               //处理失败的情况                if (responseFuture.isSendRequestOK()) {                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,                        responseFuture.getCause());                } else {                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());                }            }            return responseCommand;        } finally {            this.responseTable.remove(opaque);        }    }
    

到这里我们就可以在 NameServer的org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest方法中的case RequestCode.REGISTER_BROKER:位置打断点跟踪 Broker 发送的网络请求了:

NameServer 处理注册 Broker 请求
			case RequestCode.REGISTER_BROKER:					//获取当前 Broker 的版本信息                Version brokerVersion = MQVersion.value2Version(request.getVersion());                if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {                   //版本大于3.0.11的走该逻辑                    return this.registerBrokerWithFilterServer(ctx, request);                } else {                    return this.registerBroker(ctx, request);                }

registerBrokerWithFilterServer中可分为三个步骤:

  1. 解析请求参数(包含crc32的校验)

  2. 调用路由管理器 RouteInfoManager 进行注册;

  3. 封装返回结果

    状态码、master 地址、Ha 服务端地址等

我们主要看路由管理器的注册流程:

public RegisterBrokerResult registerBroker(        final String clusterName,        final String brokerAddr,        final String brokerName,        final long brokerId,        final String haServerAddr,        final TopicConfigSerializeWrapper topicConfigWrapper,        final List filterServerList,        final Channel channel) {   	//注册结果对象        RegisterBrokerResult result = new RegisterBrokerResult();        try {            try {                //加写锁(排他锁)。                this.lock.writeLock().lockInterruptibly();                //1.获取集群下的 broker 列表                Set brokerNames = this.clusterAddrTable.get(clusterName);                if (null == brokerNames) {                   //1.1第一次为空,则初始化后保存                    brokerNames = new HashSet();                    this.clusterAddrTable.put(clusterName, brokerNames);                }					//1.2 添加当前的 brokername进去,因为是 set 所以不用担心重复                brokerNames.add(brokerName);                boolean registerFirst = false;                //2.根据BrokerName获取BrokerData数据                BrokerData brokerData = this.brokerAddrTable.get(brokerName);                if (null == brokerData) {                   //2.1第一次注册,则初始化,并保存当前 broker 信息                    registerFirst = true;                    brokerData = new BrokerData(clusterName, brokerName, new HashMap());                    this.brokerAddrTable.put(brokerName, brokerData);                }                Map brokerAddrsMap = brokerData.getBrokerAddrs();                //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>                //The same IP:PORT must only have one record in brokerAddrTable                //从注释可以看出来,这里主要处理slave 切换为 master 时情况,要把SLAVE 旧的信息移除,切换为新的信息                Iterator> it = brokerAddrsMap.entrySet().iterator();                while (it.hasNext()) {                    Entry item = it.next();                   	//2.2如果当前地址对应的 brokerId 与本地缓存已有的 brokerId 不同,则移除旧的                    if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {                        it.remove();                    }                }					 //2.3 保存新的 brokerid 和 broker地址的映射,返回旧的地址                String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);                registerFirst = registerFirst || (null == oldAddr);                if (null != topicConfigWrapper                    && MixAll.MASTER_ID == brokerId) {                    if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())                        || registerFirst) {//如果是第一次注册或 broker 的 topic 配置变化了的话                        ConcurrentMap tcTable =                            topicConfigWrapper.getTopicConfigTable();                        if (tcTable != null) {                            for (Map.Entry entry : tcTable.entrySet()) {                               //3.遍历每个topic 配置,更新topic 的队列新到到topicQueueTable【topic->List】                                this.createAndUpdateQueueData(brokerName, entry.getValue());                            }                        }                    }                }                //4. 30s一次的心跳注册时,会更新 broker 的活跃信息到brokerLiveTable,保存当前收到心跳最新的时间戳                BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,                    new BrokerLiveInfo(                        System.currentTimeMillis(),                        topicConfigWrapper.getDataVersion(),                        channel,                        haServerAddr));                if (null == prevBrokerLiveInfo) {                    log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);                }                if (filterServerList != null) {                    if (filterServerList.isEmpty()) {                        this.filterServerTable.remove(brokerAddr);                    } else {                       //filterServer 信息更新                        this.filterServerTable.put(brokerAddr, filterServerList);                    }                }                if (MixAll.MASTER_ID != brokerId) {                   //如果是 Slave 的心跳                    String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);                    if (masterAddr != null) {                        BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);                        if (brokerLiveInfo != null) {                           //更新主节点下的SLAVE 节点地址信息                            result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());                            result.setMasterAddr(masterAddr);                        }                    }                }            } finally {               //释放写锁                this.lock.writeLock().unlock();            }        } catch (Exception e) {            log.error("registerBroker Exception", e);        }        return result;    }

梳理下:

路由管理器中维护的 broker 元数据信息在 NameServer 启动流程一文中讲过了,具体的可以返回看一下,这里简单过一下,主要有以下(也是注册和心跳时更新的)路由结构:

  private final HashMap> topicQueueTable;//topic 下的队列元信息    private final HashMap brokerAddrTable;//broker 地址信息    private final HashMap> clusterAddrTable;//集群下的 broker 名称    private final HashMap brokerLiveTable;//broker 地址和对应的活跃信息(包含上一次心跳的时间戳)    private final HashMap> filterServerTable;//过滤服务的信息
  • 上写锁,保证一次只有一个线程修改,但是可以多个线程同时读
  • 第一次注册的时候初始化clusterAddrTable集群路由信息,后面心跳注册则直接添加 brokername
  • 第一次注册时初始化broker 地址路由表brokerAddrTable;
  • 如果SLAVE 切换为 MASTER,则更新brokerAddrTable下的 BrokerData 信息;
  • 如果第一次注册或 topic配置信息变化了,则更新topicQueueTable数据,主要是修改 topic 下的队列元数据信息;
  • 更新 broker 的活跃记录表(初始化新的对象,同时将 Broker 端传来的最新的版本信息 DataVersion 保存下来)brokerLiveTable,记录最新的心跳时间戳,用于NameServer 定时扫描,移除120s 未发送心跳的 Broker 信息;
  • 如果是 SLAVE 的注册请求,则更新其 MASTER 节点的BrokerLiveInfo中的SLAVE 节点的地址;
  • 释放写锁
注册 Broker 条件

以上就是 Broker 注册的全部流程了,最后我们看下一开始注册 Broker 的入口,如果没有配置强制注册,则有一个判断是否需要注册的逻辑,我们看下什么条件才会进行注册 Broker:

   public List needRegister(        final String clusterName,        final String brokerAddr,        final String brokerName,        final long brokerId,        final TopicConfigSerializeWrapper topicConfigWrapper,        final int timeoutMills) {        final List changedList = new CopyOnWriteArrayList<>();        List nameServerAddressList = this.remotingClient.getNameServerAddressList();        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {            for (final String namesrvAddr : nameServerAddressList) {                brokerOuterExecutor.execute(new Runnable() {                    @Override                    public void run() {                        try {                           //封装查询参数                            QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();                            requestHeader.setBrokerAddr(brokerAddr);                            requestHeader.setBrokerId(brokerId);                            requestHeader.setBrokerName(brokerName);                            requestHeader.setClusterName(clusterName);                            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);                            request.setBody(topicConfigWrapper.getDataVersion().encode());                           //请求 NameServer 查询                            RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);                            DataVersion nameServerDataVersion = null;                            Boolean changed = false;                            switch (response.getCode()) {                                case ResponseCode.SUCCESS: {                                   //收到响应,拿到查询结果                                    QueryDataVersionResponseHeader queryDataVersionResponseHeader =                                        (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);                                    changed = queryDataVersionResponseHeader.getChanged();                                    byte[] body = response.getBody();                                    if (body != null) {                                        nameServerDataVersion = DataVersion.decode(body, DataVersion.class);                                        if                                            (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {                                           //如果当前的版本与 NameServer 端不同,则发生了变化                                            changed = true;                                        }                                    }                                    if (changed == null || changed) {                                        changedList.add(Boolean.TRUE);                                    }                                }                                default:                                    break;                            }                        } catch (Exception e) {                           //查询出错则默认改变了,会进行注册                            changedList.add(Boolean.TRUE);                        }                     }                });            }           ....        }        return changedList;    }

其实做的事情很简单,就是向 NameServer发送一个查询 DadaVersion 的请求RequestCode.QUERY_DATA_VERSION,查询 NameServer 端保存的记录的版本信息,如果和当前不同,则说明变化了,changedList只要有一个 TRUE,即任何一个 NameSever 的数据版本出现了不一致,上层的needRegister都会返回 true 进行 Broker 注册。

接着,我们简单看下 NameServer 查询 DataVersion 的过程:

//org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,        RemotingCommand request) throws RemotingCommandException {        final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);        ....请求参数解析...        DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);   		//1.这里会根据 broker 地址从brokerLiveTable从拿到 DataVersion(保存在BrokerLiveInfo中的) 信息   		//如果旧的 DataVersion 为空(第一次查询)或与当前的版本不一致,则认为是发生了变化        Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);        if (!changed) {           //如果没发生变化,则更新当前 broker活跃table 中的时间戳            this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());        }   //2.查询旧的 DataVersion 返回        DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());        response.setCode(ResponseCode.SUCCESS);        response.setRemark(null);        if (nameSeverDataVersion != null) {            response.setBody(nameSeverDataVersion.encode());        }        responseHeader.setChanged(changed);        return response;    }
  1. 首先 DataVersion 是保存在了broker 活跃列表brokerLiveTable的BrokerLiveInfo对象中的
  2. 根据请求的 broker 地址从BrokerLiveInfo中拿到旧 DataVersion 信息,如果和当前不一致,则认为变化了;
  3. 如果没有变化,则更新对应的BrokerLiveInfo的心跳时间戳,此次查询请求就当做一次简单的心跳请求【因为未发生变化的话 Broker 端就不会再发起注册/心跳请求了】
  4. 然后把旧的 DataVersion 信息响应回 Broker
  5. DataVersion 更新时机是在元数据发送变化时Broker 会更新 DataVersion 信息,然后就会在下次心跳的时候更新 NameServer 端的版本
总结

OK,到这里就梳理完了 Broker 的启动流程,我们来简单总结下:

第一大步——初始化工作

  1. 首先加载 Broker 端自身的配置信息和 NettyServer 相关的配置信息(从 broker.conf 中加载),创建了 BrokerController 控制器
  2. BrokerController 创建时会初始化 Broker 端各类基本组件,如topic配置管理器、推和拉模式的相关服务组件、延迟消息服务组件、生产者和消费者管理器以及各类会用到的线程池队列;
  3. 接着会加载磁盘中store/config 下的 json 配置信息,如topic、consumer、consumerFilter等保存到对应的管理器对象中;
  4. 然后再创建很重要的消息存储组件DefaultMessageStore
    • 该组件会初始化和消息存储相关的各类服务和组件,如 索引文件IndexFile、 commitlog 相关服务、consumeQueue相关服务等,具体参考2.1.2
    • 同样,这里会加载 store目录下的 commitlog 文件和 consumequeue 目录下的文件数据保存起来,以及 index 目录下的索引文件保存到 IndexFile 中;
    • 文件恢复工作和延迟消息服务初始化(加载好延迟等级和对应的延迟时间的关系)
  5. 创建 Netty 服务端和各类工作线程池
  6. 同时会创建许多线程池任务
  7. 初始化事务相关服务的实现类(SPI 机制)

第二大步——启动工作

  1. 启动 Broker 端的各个核心组件,最重要的是消息存储组件,在启动的时候其实就是创建各个线程去异步处理一些任务

  2. 启动 commitlog 分发,producer 发送消息后,分发 commitlog,其实就是通过 commitlog 构建 逻辑消费队列consumequeue和索引文件 IndexFile

  3. 启动HA 服务

  4. 开启延迟消息处理服务,处理不同延迟等级的消息;并开启任务定时刷新延迟消息到 store/config/下的delayOffset.json 文件中

  5. 创建定时刷盘的任务(commitlog 和 consumelog 数据刷新到磁盘)

  6. 创建 abort 文件【用于标记是否正常关闭 Broker】

  7. 创建删除过期 commitlog 和 consumequeue 的定时任务

  8. 注册 Broker 和 Broker 心跳发送

    默认30s 发送一次心跳,发送心跳时会携带当前 Broker 的基础信息

其中,涉及到的诸多处理细节的(如消息发送、延迟消息处理、消息刷盘、主从同步等等等等)会在后面文章一点点分析

下一节会梳理 Producer 发送消息的流程。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582199.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号