Broker源码解析:
启动流程:
BrokerStartup
main():
start(BrokerController controller=createBrokerController(args)):
createBrokerController:
1:是否指定了netty通信时缓冲区的大小,若未指定初始化为128K
2:解析命令行参数,加载-C参数文件中指定的配置信息,防止丢失这里记录的是配置文件中的全部配置信息。使用读写锁中的写锁
3:执行BrokerController.initialize()初始化方法
boolean result = this.topicConfigManager.load();
result = result && this.consumerOffsetManager.load();
result = result && this.subscriptionGroupManager.load();
result = result && this.consumerFilterManager.load();
1:加载broker中的topic信息(topic.json文件中取出topic相关信息)
2:加载消费进度信息consumerOffset.json
3:加载订阅组subscriptionGroup.json
4:加载消费过滤信息consumerFilter.json
如果上述信息均加载成功加载消息信息messageStore.load()
1:加载commitLog 使用mmap加载commitlog信息this.mappedFileQueue.load(); delayOffset.json
2: 加载消费队列信息consumequeue
如果配置均加载成功
1:创建nettyserver this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);
2:创建配置nettyserver his.fastRemotingServer = new NettyRemotingServer(fastConfig, this.clientHousekeepingService);
3:创建发送消息线程池 固定线程池空闲60s(线程池配置为broker配置中指定)以下线程池均是如此
4:创建拉取消息线程池
5:创建消息回复线程池
6:创建消息查询线程池
7:创建adminBroker线程池
8:创建clientManager线程池
9:创建心跳线程池
10:创建结束事务线程池
11:创建消费线程池
12:注册处理器this.registerProcessor();为不同的请求类型指定线程池,即为上面创建的线程池
1:发送消息处理器
2:拉取消息处理器
3:响应消息处理器
4:查询消息处理器
5:客户端管理处理器
6:消费管理处理器
7:结束事务处理器
8:管理broker处理器AdminBrokerProcessor
13:定时统计broker的状态信息(间隔1天统计一次)
14:定时持久化消费状态信息(时长根据broker中的配置而定)定时向consumerOffset.json文件中写入消费者偏移量
15:定时持久化消费过滤信息(时长根据broker中的配置而定)定时向consumerFilter.json文件写入消费者过滤器信息
16:定时(每隔三分钟)定时禁用消费慢的consumer,保护Broker(消费堆积,即消费读取慢的情况下是否关闭消费,根据消费者组进行关闭)取决与是否配置了消费慢时停止消费即isDisableConsumeIfConsumerReadSlowly
17:每隔1s打印水印 定时打印Send、Pull、Query、Transaction队列信息
18:每隔一分钟日志打印 定时打印已存储在提交日志中但尚未调度到消费队列的字节数
19:如果命名空间地址不为空更新命名空间地址列表,如果为空则判断是否获取命名空间地址isFetchNamesrvAddrByAddressServer,如果是则定时(每隔两分钟)获取命名空间地址列表
20:判断是否是高可用(主从)模式 如果为从则更新主服务的地址,因为主服务器地址可能会由于切换而改变,如果broker为主则定时打印主从差异,即从broker落后主broker的字节数
21:是否是同tls协议模式 是:注册一个监听器检查ssl的信息检查证书是否发生变化
通过SPI方式实现22,23,24
22:initialTransaction();加载事务需要的实例
23:initialAcl();创建acl的权限检查
24:initialRpcHooks();会注册配置了的RPC钩子
4:BrokerController.start():
1:判断相关服务对象是否为空,不为空则启动(消息存储,远程服务,文件监听,消费过滤,客户端保持,broker外放api)
2:判断是否是高可用模式是的话则启动高可用处理器,并且处理从库异步复制,注册所有broker BrokerController.registerBrokerAll(true, false, true);
3:定时注册所有broker BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
1:向服务器nameserv发送心跳包
2:向brokerOuterApi发送信息this.brokerOuterAPI.registerBrokerAll():(头部:集群名,broker地址,broker名称,brokerId,Ha服务地址;请求体:主体配置信息,过滤服务器配置信息,校验)
向多个nameserv发送使用CDL CountDownLatch
3:BrokerOuterAPI.registerBroker 使用默认的请求处理器,根据请求码不同处理不同的请求,判断是注册broker
broker发送netty请求到nameserv服务端,nameserv使用注册的处理器进行处理
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
topicConfigWrapper,
null,
ctx.channel()
);
4:根据响应结果更新HAmaster地址,如果为空则设置Master的地址
5:注册优雅关闭(synchronized (this)同步操作)记录关闭耗时 在 JVM 进程关闭之前, 先将线程池关闭, 及时释放资源