下面就分析一下master的启动过程。
MasterServer
先看MasterServer源码概览,它是一个SpringBoot普通应用,可以有Autowired字段。有三个主要的方法:run/stop/heartBeatThread。根据经验和注释大胆猜测一下,run是master的主要启动逻辑;stop负责优雅退出(销毁资源、容灾等);heartBeatThread负责与zk的心跳。
先从非主干逻辑分析,那就是heartBeatThread。
private Runnable heartBeatThread(){
Runnable heartBeatThread = new Runnable() {
@Override
public void run() {
if(Stopper.isRunning()) {
// send heartbeat to zk
if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
return;
}
zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
}
}
};
return heartBeatThread;
}
heartBeatThread创建了一个线程,该线程就是调用了zkMasterClient.heartBeatForZk。
public void heartBeatForZk(String znode, String serverType){
try {
//check dead or not in zookeeper
if(zkClient.getState() == CuratorframeworkState.STOPPED || checkIsDeadServer(znode, serverType)){
stoppable.stop("i was judged to death, release resources and stop myself");
return;
}
byte[] bytes = zkClient.getData().forPath(znode);
String resInfoStr = new String(bytes);
String[] splits = resInfoStr.split(Constants.COMMA);
if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
return;
}
String str = splits[0] + Constants.COMMA
+ splits[1] + Constants.COMMA
+ OSUtils.cpuUsage() + Constants.COMMA
+ OSUtils.memoryUsage() + Constants.COMMA
+ OSUtils.loadAverage() + Constants.COMMA
+ splits[5] + Constants.COMMA
+ DateUtils.dateToString(new Date());
zkClient.setData().forPath(znode,str.getBytes());
} catch (Exception e) {
logger.error("heartbeat for zk failed : " + e.getMessage(), e);
stoppable.stop("heartbeat for zk exception, release resources and stop myself");
}
}
zkMasterClient.heartBeatForZk就是在master对应的zookeeper目录下,更新data值,data主要包含当前系统的资源信息:CPU、内存、平均负载。还有最后一次更新的时间。
public synchronized void stop(String cause) {
try {
//execute only once
if(Stopper.isStoped()){
return;
}
logger.info("master server is stopping ..., cause : {}", cause);
// set stop signal is true
Stopper.stop();
try {
//thread sleep 3 seconds for thread quitely stop
Thread.sleep(3000L);
}catch (Exception e){
logger.warn("thread sleep exception:" + e.getMessage(), e);
}
try {
heartbeatMasterService.shutdownNow();
}catch (Exception e){
logger.warn("heartbeat service stopped exception");
}
logger.info("heartbeat service stopped");
//close quartz
try{
QuartzExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("Quartz service stopped exception:{}",e.getMessage());
}
logger.info("Quartz service stopped");
try {
ThreadPoolExecutors.getInstance().shutdown();
}catch (Exception e){
logger.warn("threadpool service stopped exception:{}",e.getMessage());
}
logger.info("threadpool service stopped");
try {
masterSchedulerService.shutdownNow();
}catch (Exception e){
logger.warn("master scheduler service stopped exception:{}",e.getMessage());
}
logger.info("master scheduler service stopped");
try {
zkMasterClient.close();
}catch (Exception e){
logger.warn("zookeeper service stopped exception:{}",e.getMessage());
}
logger.info("zookeeper service stopped");
} catch (Exception e) {
logger.error("master server stop exception : " + e.getMessage(), e);
System.exit(-1);
}
}
来看stop,它是一个同步方法(synchronized)。为了线程安全,这一点还是比较谨慎的。还会调用Stopper.isStoped(),以便只能执行一次。
后面的逻辑就比较简单了,总结如下:
- Stopper.stop()。关闭全部线程的循环标志
- 休眠3秒
- heartbeatMasterService.shutdownNow
- QuartzExecutors.getInstance().shutdown
- ThreadPoolExecutors.getInstance().shutdown
- masterSchedulerService.shutdownNow
- zkMasterClient.close
读者要细心的分析shutdownNow和shutdown的区别。一些重要的线程还是要等待其全部执行完才能退出的,比如ThreadPoolExecutors。
但上面退出的顺序就值得商榷了。假如ThreadPoolExecutors等了很久才退出,就会造成zkMasterClient退出时间也非常久。现在还不知道其他master节点是怎么进行容灾的。假如通过HeartBeat,此时heartBeat已经停止了,应该容灾,但任务线程池还在执行,其他节点又重复启动了流程定义实例是否会有影响呢?如果通过zookeeper心跳,此时任务也没有结束,zookeeper还在连接,貌似也没法容灾吧。
从上面的分析来看,在各个while循环处理Stopper.isRunning()时,并没有做相应的退出动作,所以此处的stop并不优雅。不是说master优雅退出了,其他节点就是优雅的退出。
考虑到run源码比较长,且都是一些线程初始化、提交的逻辑,下面只分析最后一段代码。
// start QuartzExecutors
// what system should do if exception
try {
ProcessScheduleJob.init(processDao);
QuartzExecutors.getInstance().start();
} catch (Exception e) {
try {
QuartzExecutors.getInstance().shutdown();
} catch (SchedulerException e1) {
logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
}
logger.error("start Quartz failed : " + e.getMessage(), e);
}
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
if (zkMasterClient.getActiveMasterNum() <= 1) {
for (int i = 0; i < Constants.DOLPHINSCHEDULER_WARN_TIMES_FAILOVER; i++) {
zkMasterClient.getalertDao().sendServerStopedalert(
1, OSUtils.getHost(), "Master-Server");
}
}
stop("shutdownhook");
}
}));
ProcessScheduleJob.init就是给ProcessScheduleJob一个static字段赋值,也就是给所有的ProcessScheduleJob一个全局的processDao
public static void init(ProcessDao processDao) {
ProcessScheduleJob.processDao = processDao;
}
感觉源码中关于processDao的处理有点模糊不清,比较随意。有些是传参,有些是getBean,有些又是全局变量。
addShutdownHook的逻辑就比较清晰了,就是添加了进程退出的hook。先发送预警信息,然后调用stop“优雅”退出。
FetchTaskThread
worker涉及的线程主要有两个FetchTaskThread、TaskScheduleThread。
FetchTaskThread从名称上来看,应该是从zk队列拉取任务信息的。它也是Runnable的一个实现类,还是从run方法入手分析。
public void run() {
while (Stopper.isRunning()){
InterProcessMutex mutex = null;
try {
ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
//check memory and cpu usage and threads
boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor);
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
if(!runCheckFlag) {
continue;
}
//whether have tasks, if no tasks , no need lock //get all tasks
List tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
if (CollectionUtils.isEmpty(tasksQueueList)){
continue;
}
// creating distributed locks, lock path /dolphinscheduler/lock/worker
mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
zkWorkerClient.getWorkerLockPath());
// task instance id str
List taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);
for(String taskQueueStr : taskQueueStrArr){
if (StringUtils.isEmpty(taskQueueStr)) {
continue;
}
if (!checkThreadCount(poolExecutor)) {
break;
}
// get task instance id
taskInstId = getTaskInstanceId(taskQueueStr);
// mainly to wait for the master insert task to succeed
waitForTaskInstance();
taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId);
// verify task instance is null
if (verifyTaskInstanceIsNull(taskInstance)) {
logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);
removeNodeFromTaskQueue(taskQueueStr);
continue;
}
Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
taskInstance.getProcessDefine().getUserId());
// verify tenant is null
if (verifyTenantIsNull(tenant)) {
logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
removeNodeFromTaskQueue(taskQueueStr);
continue;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
String userQueue = processDao.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());
logger.info("worker fetch taskId : {} from queue ", taskInstId);
if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
continue;
}
// local execute path
String execLocalPath = getExecLocalPath();
logger.info("task instance local execute path : {} ", execLocalPath);
// init task
taskInstance.init(OSUtils.getHost(),
new Date(),
execLocalPath);
// check and create Linux users
FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
tenant.getTenantCode(), logger);
logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
// submit task
workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));
// remove node from zk
removeNodeFromTaskQueue(taskQueueStr);
}
}catch (Exception e){
logger.error("fetch task thread failure" ,e);
}finally {
AbstractZKClient.releaseMutex(mutex);
}
}
}
run还是一个while“死循环”,首先检查了当前资源是否超阈值、线程数是否够用,然后休眠1秒,判断前面的结果,为false则进入下一个循环。
调用taskQueue.getAllTasks获取当前所有的任务列表,为空则进入下一次循环。
申请InterProcessMutex锁,这样同一时刻只有一个worker节点可以从队列中poll任务。这意味着,任务会随机的在worker节点执行。抢占到锁之后,每次poll固定数量的任务。
获取到任务列表之后,就是一个for循环,依次处理任务。下面简单总结一下其逻辑。
- 判断taskQueueStr是否为空。感觉有点多此一举。
- 判断当前线程数是否够用
- 从taskQueueStr中取到任务ID。就是按照_分隔之后的第四个字段。
- 等待任务实例信息插入到数据库。循环30次,每次等待1秒。注释说数据库操作会被延迟,不知道哪里会延迟。
- 通过任务id,获取任务实例信息。
- 通过任务实例,获取租户信息。
- 通过任务实例,获取用户队列信息。为啥不在查询任务实例信息的时候,直接获取到呢?或者在getTaskInstanceDetailByTaskId一次性获取到?
- 判断任务实例是否可以在当前节点执行,不能则继续下一个任务处理。这为啥不提前判断呢?调了2次db查询才来判断?
- 任务实例初始化
- 检查目录、用户是否存在。不存在则创建用户、目录。为啥不是提前建好?每次还要检查一遍。
- 提交任务,交给TaskScheduleThread线程执行。
- 删除taskQueue中对应的任务节点。
FetchTaskThread功能就是抢占zk锁,从TaskQueue获取任务,然后创建TaskScheduleThread线程去执行。



