栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

5、RocketMQ 源码解析之 命名服务启动

5、RocketMQ 源码解析之 命名服务启动

在 RocketMQ 当中,消息发送方以及消息接收方都是配置命名服务(Name Server)的地址。通过命名服务解耦合了消息发送者以及消息接收方,不同于 Kafka 直接连接 Broker 地址。命名服务的主要功能包含:Broker 管理以及消息的路由管理。具有如下:

  • Broker 管理,NameServer接受来自 Broker 集群的注册,并提供心跳机制来检查Broker是否活着
  • 路由管理,每个NameServer将保存关于Broker集群的整个路由信息和供客户端查询的 queue (队列) 信息。

源码分析基于 RocketMQ - 4.9.3

1、NameServer 启动整体流程

从之前 3、RocketMQ 源码解析之 源代码环境搭建 这篇文章中我可以看到命名服务的启动类是:NamesrvStartup。而 命名服务最重要的类其实是 NamesrvController,它控制着命名服务的整个流程。命名服务启动其实就是调用NamesrvController的三个方法的过程:

  • NamesrvController#:通过有参构建器传入 NamesrvConfig 以及 NettyServerConfig 这两个类,创建NamesrvController对象。
  • NamesrvController#initialize:调用该方法对 NamesrvController对象进行属性初始化。
  • NamesrvController#start: 调用该方法完成对 NameServer 的启动,启动 NettyRemotingServer 暴露 Netty 服务端的 Socket 服务。

下面我们就来详细的分析一下这三个过程。

2、NamesrvController 对象构建

NamesrvController.init

    public NamesrvController(NamesrvConfig namesrvConfig, NettyServerConfig nettyServerConfig) {
        this.namesrvConfig = namesrvConfig;
        this.nettyServerConfig = nettyServerConfig;
        this.kvConfigManager = new KVConfigManager(this);
        this.routeInfoManager = new RouteInfoManager();
        this.brokerHousekeepingService = new BrokerHousekeepingService(this);
        this.configuration = new Configuration(
            log,
            this.namesrvConfig, this.nettyServerConfig
        );
        this.configuration.setStorePathFromConfig(this.namesrvConfig, "configStorePath");
    }

在 NamesrvController 对象构建里面最核心的还是创建 RouteInfoManager 对象,这个对象里面保存了就完成了 Broker 管理 以及 消息的路由管理 。

下面我们来看一下这个对象里面的核心字段:

  • HashMap> topicQueueTable :消息主题以及对应的 队列数据映射,在 RocketMQ 当中,一个消息主题可以发送不到同的 Queue 当中,达到负载均衡的目的。
  • HashMap brokerAddrTable :broker 名称以及 Broker 数据的映射信息。
  • HashMap> clusterAddrTable:集群名称以及对应的 Broker 名称列表
  • HashMap brokerLiveTable:broker 地址对应 Broker 通道(Channel) 的对应信息。
  • HashMap> filterServerTable:broker 地址以及消息的各种过滤机制。

这里包含了 Broker 对应的所有元数据信息。Broker 每过一段时间(默认 30 秒)就会向 NameServer 发送心跳,告诉 NameServer 我当前还是存活的。并且如果心跳超过 2 分钟没有发送就会把这个 Broker 从上面的 brokerLiveTable 列表当中移除。

RouteInfoManager#scanNotActiveBroker

    public void scanNotActiveBroker() {
        Iterator> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

上面就是 Broker 存活检测逻辑。

3、NamesrvController 初始化

下面我们来看一下 的初始化逻辑。

NamesrvController#initialize

    public boolean initialize() {

        this.kvConfigManager.load();

        this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);

        this.remotingExecutor =
            Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));

        this.registerProcessor();

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
            // Register a listener to reload SslContext
            try {
                fileWatchService = new FileWatchService(
                    new String[] {
                        TlsSystemConfig.tlsServerCertPath,
                        TlsSystemConfig.tlsServerKeyPath,
                        TlsSystemConfig.tlsServerTrustCertPath
                    },
                    new FileWatchService.Listener() {
                        boolean certChanged, keyChanged = false;
                        @Override
                        public void onChanged(String path) {
                            if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
                                log.info("The trust certificate changed, reload the ssl context");
                                reloadServerSslContext();
                            }
                            if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
                                certChanged = true;
                            }
                            if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
                                keyChanged = true;
                            }
                            if (certChanged && keyChanged) {
                                log.info("The certificate and private key changed, reload the ssl context");
                                certChanged = keyChanged = false;
                                reloadServerSslContext();
                            }
                        }
                        private void reloadServerSslContext() {
                            ((NettyRemotingServer) remotingServer).loadSslContext();
                        }
                    });
            } catch (Exception e) {
                log.warn("FileWatchService created error, can't load the certificate dynamically");
            }
        }

        return true;
    }

上面的逻辑其实挺简单的,主要的逻辑如下:

  • kvConfigManager KV 类型的配置管理器从配置文件中加载配置
  • 创建 Netty 远程服务 NettyRemotingServer 暴露命名服务 Socket 通信
  • 创建一个远程线程池 remotingExecutor 用于异步处理客户端的 Socket 连接请求,默认没 8 个工作线程。
  • registerProcessor():向 NettyRemotingServer 远程服务端注册请求处理器(DefaultRequestProcessor,这个处理器我们稍后在分析),并且使用上一步创建的 remotingExecutor进行异步处理
  • 使用单个线程池调用 RouteInfoManager#scanNotActiveBroker 扫描并删除 2 分钟没有心跳的 Broker 以及 KVConfigManager#printAllPeriodically 打印kvConfigManager KV 类型的配置管理器里面的配置值。
  • 如果服务器开启了 SSL 加密传输就启动 FileWatchService 定时扫描文件更新 SSL

DefaultRequestProcessor 其实就是整个 NameServer 处理 Socket 请求的类.其实最终就是操作 KVConfigManager 这个配置类或者 RouteInfoManager 这个类。

4、NamesrvController 启动

NamesrvController 的启动就相对比较简单了。

    public void start() throws Exception {
        this.remotingServer.start();

        if (this.fileWatchService != null) {
            this.fileWatchService.start();
        }
    }

它的只包括了两个步骤:

  • NettyRemotingServer#start,调用 Netty 的服务端 ServerBootstrap 进行服务端启动暴露 Socket 服务,默认是 9876;
  • 启动 FileWatchService 服务,当 SSL 有更新时,就更新 SSL 配置。
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/698607.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号