栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021-11-8---2021SC@SDUSC---DolphinScheduler(7)

2021-11-8---2021SC@SDUSC---DolphinScheduler(7)

2021-11-8
2021SC@SDUSC

DolphinScheduler(7)
@Override
public void add(String key, String value) {
    try {
        String taskIdPath = getTasksPath(key) + Constants.SINGLE_SLASH + value;
        String result = getZkClient().create().withMode(CreateMode.PERSISTENT).forPath(taskIdPath, Bytes.toBytes(value));

        logger.info("add task : {} to tasks queue , result success",result);
    } catch (Exception e) {
        logger.error("add task to tasks queue exception",e);
    }

}
从上下文我们知道,这里的key就是tasks_queue;
根据注释,value就是${processInstancePriority}${processInstanceId}${taskInstancePriority}_${taskId}_host1,host2...

这样来看,add就是在zk的tasks_queue父节点下创建子节点,
子节点的data就是value的值。

submit分析完,我们来继续submitWaitComplete中剩余的:waitTaskQuit。
waitTaskQuit
public Boolean waitTaskQuit(){
      //query new state
      taskInstance = processDao.findTaskInstaceById(taskinstance.getId());
      //task time out
      Boolean checkTimeout = false;
      TaskTimeoutParameter.getEnable()){...}
      if(taskTimeoutParameter.getEnable()){...}
      while(Stopper.isRunning()){
          try{
             if(this.processInstance ==null){...}
             //task instance add queue ,waiting worker to kill
             if(this.cancel||this.processInstance.getState()==ExecutionStatus.READY_STOP){...}
             if(taskInstance.getState().typeIsFinished()){
             break;
             }
             if(checkTimeout){...}
             //updateProcessInstance task instance
             taskInstance = processDao.findTaskInstanceById(taskInstance.getId());
             processInstance=processDao.findProcessInstanceById(processInstance.getId());
             Thread.sleep(Constants.SLEEP_TIME_MILLIS);
}catch(Exception e){...}
}
return true;
}
waitTaskQuit代码比较多,先从整体来分析其逻辑:

    通过taskInstance.id查询taskInstance。其实就是查询taskInstance的最新状态。
    通过参数判断是否启用超时检查
    一个while“死循环”。
    while中判断任务是否执行结束,是则退出
    获取任务实例、流程实例最新状态
    休眠1秒,进入下一次while循环

简单来说waitTaskQuit就是循环查看任务实例的状态,直至其成功。

MasterTaskExecThread的功能整体来看就是把任务实例信息插入到数据库,并放到zookeeper队列,然后循环等待任务实例的状态变成完成,并没有任何具体的执行逻辑。

1

至此,master涉及的5个主要线程,已经分析了四个(SubProcessTaskExecThread没有分析),主要功能分析结束。下面就分析一下master的启动过程。

MasterServer

@ComponentScan("org.apache.dolpinscheduler")
public class MasterServer extends AbstractServer{
   
   private static final Logger logger = LoggerFactory.getLogger(MasterServer.class);
   
   private ZKMaskerClient zkMasterClient=null;
   
   private ScheduledExecutorService heartbeatMasterService;
   
   @Autowired
   protected ProcessDao processDao;
   
   private ExecutorService masterSchedulerService;

   
   public static void main(String[] args){
   SpringApplication.run(MasterServer.class,args);
   }

@PostConstruct 
public void run(){...}


@Override
public synchronized void stop(String cause){...}


private Runnable heartBeatThread(){...}
}

先看MasterServer源码概览,它是一个SpringBoot普通应用,可以有Autowired字段。有三个主要的方法:run/stop/heartBeatThread。根据经验和注释大胆猜测一下,run是master的主要启动逻辑;stop负责优雅退出(销毁资源、容灾等);heartBeatThread负责与zk的心跳。

这次分析,我们先从非主干逻辑分析,那就是heartBeatThread。

heartBeatThread
private Runnable heartBeatThread(){
    Runnable heartBeatThread  = new Runnable() {
        @Override
        public void run() {
            if(Stopper.isRunning()) {
                // send heartbeat to zk
                if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
                    logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
                    return;
                }

                zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
            }
        }
    };
    return heartBeatThread;
}

heartBeatThread创建了一个线程,该线程就是调用了

zkMasterClient.heartBeatForZk。
public void heartBeatForZk(String znode, String serverType){
	try {

		//check dead or not in zookeeper
		if(zkClient.getState() == CuratorframeworkState.STOPPED || checkIsDeadServer(znode, serverType)){
			stoppable.stop("i was judged to death, release resources and stop myself");
			return;
		}

		byte[] bytes = zkClient.getData().forPath(znode);
		String resInfoStr = new String(bytes);
		String[] splits = resInfoStr.split(Constants.COMMA);
		if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
			return;
		}
		String str = splits[0] + Constants.COMMA
				+ splits[1] + Constants.COMMA
				+ OSUtils.cpuUsage() + Constants.COMMA
				+ OSUtils.memoryUsage() + Constants.COMMA
				+ OSUtils.loadAverage() + Constants.COMMA
				+ splits[5] + Constants.COMMA
				+ DateUtils.dateToString(new Date());
		zkClient.setData().forPath(znode,str.getBytes());

	} catch (Exception e) {
		logger.error("heartbeat for zk failed : " + e.getMessage(), e);
		stoppable.stop("heartbeat for zk exception, release resources and stop myself");
	}
}

zkMasterClient.heartBeatForZk就是在master对应的zookeeper目录下,更新data值,data主要包含当前系统的资源信息:CPU、内存、平均负载,还有最后(最近)一次更新的时间.2

Stop
public synchronized void stop(String cause) {

    try {
        //execute only once
        if(Stopper.isStoped()){
            return;
        }

        logger.info("master server is stopping ..., cause : {}", cause);

        // set stop signal is true
        Stopper.stop();

        try {
            //thread sleep 3 seconds for thread quitely stop
            Thread.sleep(3000L);
        }catch (Exception e){
            logger.warn("thread sleep exception:" + e.getMessage(), e);
        }
        try {
            heartbeatMasterService.shutdownNow();
        }catch (Exception e){
            logger.warn("heartbeat service stopped exception");
        }

        logger.info("heartbeat service stopped");

        //close quartz
        try{
            QuartzExecutors.getInstance().shutdown();
        }catch (Exception e){
            logger.warn("Quartz service stopped exception:{}",e.getMessage());
        }

        logger.info("Quartz service stopped");

        try {
            ThreadPoolExecutors.getInstance().shutdown();
        }catch (Exception e){
            logger.warn("threadpool service stopped exception:{}",e.getMessage());
        }

        logger.info("threadpool service stopped");

        try {
            masterSchedulerService.shutdownNow();
        }catch (Exception e){
            logger.warn("master scheduler service stopped exception:{}",e.getMessage());
        }

        logger.info("master scheduler service stopped");

        try {
            zkMasterClient.close();
        }catch (Exception e){
            logger.warn("zookeeper service stopped exception:{}",e.getMessage());
        }

        logger.info("zookeeper service stopped");


    } catch (Exception e) {
        logger.error("master server stop exception : " + e.getMessage(), e);
        System.exit(-1);
    }
}

stop是一个同步方法(synchronized)。为了线程安全,这一点还是比较谨慎的
还会调用Stopper.isStoped(),以便只能执行一次。
后面的逻辑总结如下:
1.Stopper.stop()。关闭全部线程的循环标志
2.休眠3秒
3.heartbeatMasterService.shutdownNow
4.QuartzExecutors.getInstance().shutdown
5.ThreadPoolExecutors.getInstance().shutdown
6.masterSchedulerService.shutdownNow
7.zkMasterClient.close
要分析shutdownNow和shutdown的区别。
一些重要的线程还是要等待其全部执行完才能退出的,如ThreadPoolExecutors。
但上面退出的顺序就值得商榷了。
假如ThreadPoolExecutors等很久才退出,会让zkMasterClient退出时间也久
现在还不知道其他master节点是怎么进行容灾的。
假如通过HeartBeat,此时heartBeat已经停止了,应该容灾,
但任务线程池还在执行,其他节点又重复启动了流程定义实例是否会有影响呢?
若通过zookeeper心跳,此时任务也未结束,zookeeper还在连接,似乎无法容灾

从上面的分析来看,在各个while循环处理Stopper.isRunning()时,
并没有做相应的退出动作,所以此处的stop并不优雅。
也不是说master优雅退出了,其他节点就是优雅的退出。

  1. Stopper.isRunning()作为一个全局变量,控制了N多的线程,每个线程都处于一个while“死循环”中。虽然都sleep一段时间,但感觉还是有点浪费。 ↩︎

  2. 我们注意到zkMasterClient的类型是ZKMasterClient,那是不是还会有一个功能类似的ZKWorkerClient?也是用来汇报worker节点的系统资源信息的? ↩︎

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/602321.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号