MasterServer
main() 启动masterserver服务run() 执行线程体
初始化netty远程服务器
romote serverclient server 注册zookeeper,启动自我容错机制启动调度程序启动Quartz执行器及异常处理 close()stop() 总图
MasterServer main() 启动masterserver服务MasterServer纯后台服务,不对外提供服务,因此不以web启动。SpringApplicationBuilder这种启动方式可以把多个方法串联起来,使用更加方便。
new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);run() 执行线程体
● 初始化netty远程服务器
● 注册zookeeper,启动自我容错机制
● 启动调度程序
● 启动Quartz执行器及异常处理
@EventListener
public void run(ApplicationReadyEvent ignored) {
PropertyUtils.setValue(SPRING_DATASOURCE_DRIVER_CLASS_NAME, driverClassName);
// init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
TaskAckProcessor ackProcessor = new TaskAckProcessor();
ackProcessor.init(processInstanceExecMaps);
TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor();
taskResponseProcessor.init(processInstanceExecMaps);
TaskKillResponseProcessor taskKillResponseProcessor = new TaskKillResponseProcessor();
taskKillResponseProcessor.init(processInstanceExecMaps);
StateEventProcessor stateEventProcessor = new StateEventProcessor();
stateEventProcessor.init(processInstanceExecMaps);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, new CacheProcessor());
this.nettyRemotingServer.start();
// self tolerant
this.masterRegistryClient.init(this.processInstanceExecMaps);
this.masterRegistryClient.setRegistryStoppable(this);
this.masterRegistryClient.start();
this.eventExecuteService.init(this.processInstanceExecMaps);
this.eventExecuteService.start();
// scheduler start
this.masterSchedulerService.init(this.processInstanceExecMaps);
this.masterSchedulerService.start();
this.failoverExecuteThread.start();
// start QuartzExecutors
// what system should do if exception
try {
logger.info("start Quartz server...");
QuartzExecutors.getInstance().start();
} catch (Exception e) {
try {
QuartzExecutors.getInstance().shutdown();
} catch (SchedulerException e1) {
logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
}
logger.error("start Quartz failed", e);
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
if (Stopper.isRunning()) {
close("shutdownHook");
}
}));
}
初始化netty远程服务器
romote server
服务配置
主服务
处理类
channel注册、取消、读取、更改、状态检查、异常捕获等
NettyRemotingClient:netty处理客户端(master、worker、alert)
处理类
依赖关系
通过MasterRegistryClient完成注册
this.masterRegistryClient.init(this.processInstanceExecMaps);
this.masterRegistryClient.setRegistryStoppable(this);
this.masterRegistryClient.start();
启动调度程序
通过MasterSchedulerService启动master调度服务
this.masterSchedulerService.init(this.processInstanceExecMaps);
this.masterSchedulerService.start();
启动Quartz执行器及异常处理
QuartzExecutors
try {
logger.info("start Quartz server...");
QuartzExecutors.getInstance().start();
} catch (Exception e) {
try {
QuartzExecutors.getInstance().shutdown();
} catch (SchedulerException e1) {
logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
}
logger.error("start Quartz failed", e);
}
close()
对应启动、运行方法,运行起来的服务都要注销或者关掉
猜测该方法应该是异常情况,比如资源不足等自动触发,因为执行脚本直接就是kill -9 来关闭服务。欢迎指正!
public void close(String cause) {
try {
// execute only once
if (Stopper.isStopped()) {
return;
}
logger.info("master server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
try {
// thread sleep 3 seconds for thread quietly stop
Thread.sleep(3000L);
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
// close
this.masterSchedulerService.close();
this.nettyRemotingServer.close();
this.masterRegistryClient.closeRegistry();
// close quartz
try {
QuartzExecutors.getInstance().shutdown();
logger.info("Quartz service stopped");
} catch (Exception e) {
logger.warn("Quartz service stopped exception:{}", e.getMessage());
}
// close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
springApplicationContext.close();
logger.info("springApplicationContext close");
try {
// thread sleep 60 seconds for quietly stop
Thread.sleep(60000L);
} catch (Exception e) {
logger.warn("thread sleep exception ", e);
}
// Since close will be executed in hook, so we can't use System.exit here.
Runtime.getRuntime().halt(0);
} catch (Exception e) {
logger.error("master server stop exception ", e);
Runtime.getRuntime().halt(1);
}
}
stop()
调用close()方法
@Override
public void stop(String cause) {
close(cause);
}
总图



