NamesrvStartup: NameServer的启动类;NamesrvController: NameServer的核心控制类;KVConfigManager: 读取或变更NameServer的配置属性,加载NamesrvConfig中配置的配置文件到内存;KVConfigSerializeWrapper: NameServer配置信息序列化包装类;RouteInfoManager: NameServer数据的载体,记录Broker,Topic等信息;DefaultRequestProcessor: NameServer处理请求的请求类,负责处理所有与NameServer交互的请求;BrokerHousekeepingService: BrokerHouseKeepingService实现ChannelEventListener接口,可以说是通道在发送异常时的回调方法(Nameserver与Broker的连接通道在关闭、通道发送异常、通道空闲时);NamesrvConfig: NamesrvConfig,主要指定nameserver的相关配置目录属性;NettyRemotingServer: Netty服务类;
NameServer启动NameServer的启动类是NamesrvStartup
先创建
NamesrvConfig(业务参数)和NettyServerConfig(网络参数),然后解析配置类
public class NettyServerConfig implements Cloneable {
// 监听端口,默认端口为9876
private int listenPort = 8888;
// 业务线程池线程个数
private int serverWorkerThreads = 8;
// Netty public 任务线程池个数。Netty网络会根据业务类型创建不同的线程池
private int serverCallbackExecutorThreads = 0;
// I/O线程池线程个数,主要为解析NameServer,Broker端解析请求
private int serverSelectorThreads = 3;
// 消息请求并发度
private int serveronewaySemaphorevalue = 256;
// 异步消息发送的最大并发度
private int serverAsyncSemaphorevalue = 64;
// 网络连接最大空闲时间,默认120s
private int serverChannelMaxIdleTimeSeconds = 120;
// Socket发送缓冲区大小,默认64KB
private int serverSocketSndBufSize = NettySystemConfig.socketSndbufSize;
// 接受缓存区大小,默认64KB
private int serverSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
// 是否开启缓存
private boolean serverPooledByteBufAllocatorEnable = true;
}
NameServer路由注册,故障剔除
NameServer主要作用是为消息产生者和消息消费者提供关于topic的路由信息
路由元信息
public class RouteInfoManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
// topic消息队列路由信息,发送消息时根据路由表进行负载均衡
private final HashMap> topicQueueTable;
// brokerName,所属集群名称,主备Broker地址
private final HashMap brokerAddrTable;
// Broker集群信息,存储集群中所有Broker名称
private final HashMap> clusterAddrTable;
// Broker状态信息,NameServer每次收到心跳信息时会替换该信息
private final HashMap brokerLiveTable;
// FilterServer列表,用户消息过滤
private final HashMap> filterServerTable;
}
路由注册
路由注册是通过Broker与NameServer的心跳功能实现的。Broker启动时向集群中所有的NameServer发送心跳语句,每个30s向集群中所有NameServer发送心跳包,NameServer收到Broker的心跳包会先更新BrokerLiveTable缓存中的BrokerLiveInfo的lastUpdateTimestamp,然后每隔10s扫描一次brokerLiveTable,如果连续120s没有收到心跳包,NameServer将Broker的路由信息移除,同时关闭Socket连接。
对于Zookeeper、Etcd这样强一致性组件,数据只要写到主节点,内部会通过状态机将数据复制到其他节点,Zookeeper使用的是Zab协议,etcd使用的是raft协议。
但是NameServer节点之间是互不通信的,无法进行数据复制。RocketMQ采取的策略是,在Broker节点在启动的时候,轮训NameServer列表,与每个NameServer节点建立长连接,发起注册请求。NameServer内部会维护一个Broker表,用来动态存储Broker的信息。
同时,Broker节点为了证明自己是存活的,会将最新的信息上报给NameServer,然后每隔30秒向NameServer发送心跳包,心跳包中包含 BrokerId、Broker地址、Broker名称、Broker所属集群名称等等,然后NameServer接收到心跳包后,会更新时间戳,记录这个Broker的最新存活时间。
NameServer在处理心跳包的时候,存在多个Broker同时操作一张Broker表,为了防止并发修改Broker表导致不安全,路由注册操作引入了ReadWriteLock读写锁,这个设计亮点允许多个消息生产者并发读,保证了消息发送时的高并发,但是同一时刻NameServer只能处理一个Broker心跳包,多个心跳包串行处理。这也是读写锁的经典使用场景,即读多写少。
Broker发送心跳包
路由注册时序图:
路由删除
Nameserver 启动时会创建一个定时任务,定时删除不活跃的 Broker。
public void scanNotActiveBroker() {
Iterator> it = this.brokerLiveTable.entrySet().iterator();
while (it.hasNext()) {
Entry next = it.next();
long last = next.getValue().getLastUpdateTimestamp();
// 如果当前时间大于最后修改时间加上Broker过期时间,那么就剔除该Broker
if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
// 关闭Broker对应的channel
RemotingUtil.closeChannel(next.getValue().getChannel());
it.remove();
log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
// 从brokerLiveTable、brokerAddrTable、topicQueueTable移除Broker相关信息
this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
}
}
}
删除 Broker 信息的逻辑是,首先从 BrokerLiveInfo 获取状态信息,判断 Broker 的心跳时间是否已超过限定值,若超过之后就执行删除逻辑。
link:
RocketMq技术内幕
RocketMQ NameServer工作原理与源码解析



