2021SC@SDUSC
目录
Master启动过程源码分析
2021SC@SDUSC
接上一篇博客继续分析,这篇博客主要分析finishInitialization()方法
Master启动过程源码分析
finishInitialization()方法负责初始化master主功能,该方法很长,在上一篇博客中已贴出其全部代码,这篇博客不再全部贴出,会在接下来的分析过程中贴出部分相关的代码
1. 设置本Master的isActiveMaster标志为true
isActiveMaster = true;
2. 设置masterActiveTime为当前系统时间
this.masterActiveTime = System.currentTimeMillis();
3. 初始化 MasterFileSystem,该对象封装了master常用的文件系统操作,包括、删除region目录、删除table目录、删除cf目录、检查文件系统状态等,然后创建FSTableDescriptors对象
this.fileSystemManager = new MasterFileSystem(this, this, masterRecovery);
this.tableDescriptors =
new FSTableDescriptors(this.conf, this.fileSystemManager.getFileSystem(),
this.fileSystemManager.getRootDir());
4. 将cluster id写入zookeeper的“hbaseid” ZNode,接着初始化ExecutorService和ServerManager。ExecutorService会维护一个ExecutorMap,一种Event对应一个Executor,可以通过提交EventHandler来执行异步事件;ServerManager负责管理regionserver信息,维护着online regionserver 和dead regionserver列表,处理regionserver的startups、shutdowns、 deaths等
status.setStatus("Publishing Cluster ID in ZooKeeper");
ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId());
if (!masterRecovery) {
this.executorService = new ExecutorService(getServerName().toShortString());
this.serverManager = createServerManager(this, this);
}
5. 调用initializeZKbasedSystemTrackers()来初始化各种基于zookeeper的组件
initializeZKbasedSystemTrackers();
查看initializeZKbasedSystemTrackers()方法:a) 创建CatalogTracker, CatalogTracker包含RootRegionTracker和metaNodeTracker,分别对应”/hbase/root-region-server”和/”hbase/unassigned/1028785192”这两个结点;b) 创建LoadBalancer,LoadBalancer负责region在regionserver之间的移动;c) 创建AssignmentManager,AssignmentManager负责管理和分配region,同时它也会接受zk上关于region的event,根据event来完成region的上下线、关闭、打开等工作;d) 创建 RegionServerTracker,RegionServerTracker负责监控”/hbase/rs”结点,通过ZK的Event来跟踪online regionservers,如果有rs下线,则删除ServerManager中对应的online regions;e) 创建 DrainingServerTracker,DrainingServerTracker负责监控”/hbase/draining”结点;f) 创建 ClusterStatusTracker,ClusterStatusTracker负责监控”/hbase/shutdown”结点维护集群状态;g) 创建SnapshotManager,SnapshotManager负责管理Hbase快照功能,包括创建快照,恢复快照
void initializeZKbasedSystemTrackers() throws IOException,
InterruptedException, KeeperException {
this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf, this);
this.catalogTracker.start();
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
this.assignmentManager = new AssignmentManager(this, serverManager,
this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
this.tableLockManager);
zooKeeper.registerListenerFirst(assignmentManager);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
this.serverManager);
this.regionServerTracker.start();
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this,
this.serverManager);
this.drainingServerTracker.start();
boolean wasUp = this.clusterStatusTracker.isClusterUp();
if (!wasUp) this.clusterStatusTracker.setClusterUp();
LOG.info("Server active/primary master=" + this.serverName +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
", setting cluster-up flag (Was=" + wasUp + ")");
this.snapshotManager = new SnapshotManager();
this.mpmHost = new MasterProcedureManagerHost();
this.mpmHost.register(this.snapshotManager);
this.mpmHost.loadProcedures(conf);
this.mpmHost.initialize(this, this.metricsMaster);
}
6. 初始化MasterCoprocessorHost,并调用startServiceThreads()方法来启动各种服务,MasterCoprocessorHost是负责管理所有master相关操作的协处理器
if (!masterRecovery) {
status.setStatus("Initializing master coprocessors");
this.cpHost = new MasterCoprocessorHost(this, this.conf);
spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());
status.setStatus("Initializing master service threads");
startServiceThreads();
}
查看startServiceThreads()方法:首先启动MASTER_OPEN_REGION、MASTER_CLOSE_REGION、MASTER_SERVER_OPERATIONS、MASTER_meta_SERVER_OPERATIONS、MASTER_TABLE_OPERATIONS这五个事件的处理线程池,然后创建logCleaner并启动、创建hfileCleaner并启动、启动healthCheckChore,最后调用RpcServer的openServer()方法,允许响应请求:
void startServiceThreads() throws IOException{
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
conf.getInt("hbase.master.executor.openregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
conf.getInt("hbase.master.executor.closeregion.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
this.executorService.startExecutorService(ExecutorType.MASTER_meta_SERVER_OPERATIONS,
conf.getInt("hbase.master.executor.serverops.threads", 5));
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
conf.getInt("hbase.master.executor.logreplayops.threads", 10));
this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1);
String n = Thread.currentThread().getName();
int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 60 * 1000);
this.logCleaner =
new LogCleaner(cleanerInterval,
this, conf, getMasterFileSystem().getFileSystem(),
getMasterFileSystem().getOldLogDir());
Threads.setDaemonThreadRunning(logCleaner.getThread(), n + ".oldLogCleaner");
Path archiveDir = HFileArchiveUtil.getArchivePath(conf);
this.hfileCleaner = new HFileCleaner(cleanerInterval, this, conf, getMasterFileSystem()
.getFileSystem(), archiveDir);
Threads.setDaemonThreadRunning(hfileCleaner.getThread(), n + ".archivedHFileCleaner");
if (this.healthCheckChore != null) {
Threads.setDaemonThreadRunning(this.healthCheckChore.getThread(), n + ".healthChecker");
}
this.rpcServer.openServer();
this.rpcServerOpen = true;
if (LOG.isTraceEnabled()) {
LOG.trace("Started service threads");
}
}
7. 调用ServerManager.waitForRegionServers()来等待所有region server报告,当满足以下条件时返回:
- 至少等待4.5s(hbase.master.wait.on.regionservers.timeout)
- 成功启动的regionserver节点数>=1(hbase.master.wait.on.regionservers.mintostart)
- 1.5s(hbase.master.wait.on.regionservers.interval)内没有regionsever挂掉或重新启动
this.serverManager.waitForRegionServers(status);
8. 检查zk中已启动但未注册的regionserver,并将这些regionserver注册
for (ServerName sn: this.regionServerTracker.getonlineServers()) {
if (!this.serverManager.isServeronline(sn)
&& serverManager.checkAndRecordNewServer(sn, ServerLoad.EMPTY_SERVERLOAD)) {
LOG.info("Registered server found up in zk but who has not yet reported in: " + sn);
}
}
9. 启动AssignmentManager的超时监控,即TimeoutMonitor
if (!masterRecovery) {
this.assignmentManager.startTimeOutMonitor();
}
10. 调用getFailedServersFromLogFolders()方法获取已挂掉,却在WALs目录下仍然遗留log的server
SetpreviouslyFailedServers = this.fileSystemManager .getFailedServersFromLogFolders();
11. 调用removeStaleRecoveringRegionsFromZK()方法从ZooKeerper中删除上一步找到的server,此处并不是删除log,而是删除ZooKeeper中挂掉的server信息,后面会对每个region重新分配region server
this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);
12. 如果meta表被分配给挂掉的server,则先分离meta log
ServerName oldmetaServerLocation = this.catalogTracker.getmetaLocation();
if (oldmetaServerLocation != null && previouslyFailedServers.contains(oldmetaServerLocation)) {
splitmetaLogBeforeAssignment(oldmetaServerLocation);
}
13. 从ZooKeeper获取之前挂掉的meta server,合并挂掉的meta server和region server列表
SetpreviouslyFailedmetaRSs = getPreviouselyFailedmetaServersFromZK(); previouslyFailedmetaRSs.addAll(previouslyFailedServers);
14. 初始化load balancer
this.balancer.setClusterStatus(getClusterStatus());
this.balancer.setMasterServices(this);
this.balancer.initialize();
15. 分配meta region到region server
status.setStatus("Assigning meta Region");
assignmeta(status, previouslyFailedmetaRSs);
16. 处理挂掉的server
for (ServerName tmpServer : previouslyFailedServers) {
this.serverManager.processDeadServer(tmpServer, true);
}
17. 创建并启动clusterStatusChore、balancerChore、catalogJanitorChore
this.clusterStatusChore = getAndStartClusterStatusChore(this);
this.balancerChore = getAndStartBalancerChore(this);
this.catalogJanitorChore = new CatalogJanitor(this, this);
startCatalogJanitorChore();
18. 清理挂掉的server
this.serverManager.clearDeadServersWithSameHostNameAndPortOfonlineServer();



