栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

NameServer

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

NameServer

NameServer集群结构图

 

NamesrvStartup: NameServer的启动类;NamesrvController: NameServer的核心控制类;KVConfigManager: 读取或变更NameServer的配置属性,加载NamesrvConfig中配置的配置文件到内存;KVConfigSerializeWrapper: NameServer配置信息序列化包装类;RouteInfoManager: NameServer数据的载体,记录Broker,Topic等信息;DefaultRequestProcessor: NameServer处理请求的请求类,负责处理所有与NameServer交互的请求;BrokerHousekeepingService: BrokerHouseKeepingService实现ChannelEventListener接口,可以说是通道在发送异常时的回调方法(Nameserver与Broker的连接通道在关闭、通道发送异常、通道空闲时);NamesrvConfig: NamesrvConfig,主要指定nameserver的相关配置目录属性;NettyRemotingServer: Netty服务类;

NameServer启动

NameServer的启动类是NamesrvStartup

先创建

NamesrvConfig(业务参数)和NettyServerConfig(网络参数),然后解析配置类 

public class NettyServerConfig implements Cloneable {
    // 监听端口,默认端口为9876
    private int listenPort = 8888;
    // 业务线程池线程个数
    private int serverWorkerThreads = 8;
    // Netty public 任务线程池个数。Netty网络会根据业务类型创建不同的线程池
    private int serverCallbackExecutorThreads = 0;
    // I/O线程池线程个数,主要为解析NameServer,Broker端解析请求
    private int serverSelectorThreads = 3;
    // 消息请求并发度
    private int serveronewaySemaphorevalue = 256;
    // 异步消息发送的最大并发度
    private int serverAsyncSemaphorevalue = 64;
    // 网络连接最大空闲时间,默认120s
    private int serverChannelMaxIdleTimeSeconds = 120;
    // Socket发送缓冲区大小,默认64KB
    private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    // 接受缓存区大小,默认64KB
    private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    // 是否开启缓存
    private boolean serverPooledByteBufAllocatorEnable = true;
}

NameServer路由注册,故障剔除

NameServer主要作用是为消息产生者和消息消费者提供关于topic的路由信息

路由元信息

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    // topic消息队列路由信息,发送消息时根据路由表进行负载均衡
    private final HashMap> topicQueueTable;
    
    // brokerName,所属集群名称,主备Broker地址
    private final HashMap brokerAddrTable;

    // Broker集群信息,存储集群中所有Broker名称
    private final HashMap> clusterAddrTable;

    // Broker状态信息,NameServer每次收到心跳信息时会替换该信息
    private final HashMap brokerLiveTable;

    // FilterServer列表,用户消息过滤
    private final HashMap> filterServerTable;
}

路由注册

路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳语句,每个30s向集群中所有NameServer发送心跳包,NameServer收到Broker的心跳包会先更新BrokerLiveTable缓存中的BrokerLiveInfo的lastUpdateTimestamp,然后每隔10s扫描一次brokerLiveTable,如果连续120s没有收到心跳包,NameServer将Broker的路由信息移除,同时关闭Socket连接。

对于Zookeeper、Etcd这样强一致性组件,数据只要写到主节点,内部会通过状态机将数据复制到其他节点,Zookeeper使用的是Zab协议,etcd使用的是raft协议。

但是NameServer节点之间是互不通信的,无法进行数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。

同时,Broker节点为了证明自己是存活的,会将最新的信息上报给NameServer,然后每隔30秒向NameServer发送心跳包,心跳包中包含 BrokerId、Broker地址、Broker名称、Broker所属集群名称等等,然后NameServer接收到心跳包后,会更新时间戳,记录这个Broker的最新存活时间。

NameServer在处理心跳包的时候,存在多个Broker同时操作一张Broker表,为了防止并发修改Broker表导致不安全,路由注册操作引入了ReadWriteLock读写锁,这个设计亮点允许多个消息生产者并发读,保证了消息发送时的高并发,但是同一时刻NameServer只能处理一个Broker心跳包,多个心跳包串行处理。这也是读写锁的经典使用场景,即读多写少。

Broker发送心跳包

路由注册时序图:

路由删除

Nameserver 启动时会创建一个定时任务,定时删除不活跃的 Broker。

public void scanNotActiveBroker() {
    Iterator> it = this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        // 如果当前时间大于最后修改时间加上Broker过期时间,那么就剔除该Broker
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            // 关闭Broker对应的channel
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
            // 从brokerLiveTable、brokerAddrTable、topicQueueTable移除Broker相关信息
            this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
        }
    }
}

 删除 Broker 信息的逻辑是,首先从 BrokerLiveInfo 获取状态信息,判断 Broker 的心跳时间是否已超过限定值,若超过之后就执行删除逻辑。

link:

RocketMq技术内幕

RocketMQ NameServer工作原理与源码解析

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/749347.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号