栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC(dolphinscheduler- common)

2021SC@SDUSC(dolphinscheduler- common)

继续分析DependentTask

allDependentTaskFinish是一个非常重要的逻辑。

private boolean allDependentTaskFinish(){
    boolean finish = true;
    for(DependentExecute dependentExecute : dependentTaskList){
        for(Map.Entry entry: dependentExecute.getDependResultMap().entrySet()) {
            if(!dependResultMap.containsKey(entry.getKey())){
                dependResultMap.put(entry.getKey(), entry.getValue());
                //save depend result to log
                logger.info("dependent item complete {} {},{}",
                        DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString());
            }
        }
        if(!dependentExecute.finish(dependentDate)){
            finish = false;
        }
    }
    return finish;
}

它遍历了dependentTaskList,通过dependentExecute.finish(dependentDate)判断了依赖的作业是否全部完成,任意一个没有完成,则退出循环,返回false。

dependentDate的值也很重要,它其实是任务的调度时间或者启动时间(补数时间)

if(taskProps.getScheduleTime() != null){
    this.dependentDate = taskProps.getScheduleTime();
}else{
    this.dependentDate = taskProps.getTaskStartTime();
}

通过一层层追踪分析DependentExecute.finish,我们定位到了DependentExecute.calculateResultForTasks,这是用来判断某个依赖项的依赖结果的。

private DependResult calculateResultForTasks(DependentItem dependentItem,
                                                    List dateIntervals) {
    DependResult result = DependResult.FAILED;
    for(DateInterval dateInterval : dateIntervals){
        ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
                                                dateInterval);
        if(processInstance == null){
            logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}",
                   dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() );
            return DependResult.FAILED;
        }
        if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
            result = getDependResultByState(processInstance.getState());
        }else{
            TaskInstance taskInstance = null;
            List taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());

            for(TaskInstance task : taskInstanceList){
                if(task.getName().equals(dependentItem.getDepTasks())){
                    taskInstance = task;
                    break;
                }
            }
            if(taskInstance == null){
                // cannot find task in the process instance
                // maybe because process instance is running or failed.
                 result = getDependResultByState(processInstance.getState());
            }else{
                result = getDependResultByState(taskInstance.getState());
            }
        }
        if(result != DependResult.SUCCESS){
            break;
        }
    }
    return result;
}

总结并简化其重要的逻辑,大概是如果依赖整个DAG,则判断流程定义实例的状态;否则依次判断依赖任务实例的状态。

DependentTask的逻辑简单清晰,就是循环等待所有的任务结束。但感觉这样设计不太好,毕竟把它当成一个普通的Task,就意味着它会占用整体的可调用的线程池。如果项目多、任务多、依赖也多的话,这个浪费还是有点大的。个人觉得DependentTask可以单独设计成一个线程,或者放到独立的线程池去运行。毕竟对于一个调度系统来说,“依赖”还是一个非常重要的概念的。

WorkerServer

最后我们分析WorkerServer,这是与master同级的类。与master分析思路一致,还是先来看stop方法。

此处补贴代码,只总结stop逻辑。

  1. 调用Stopper.stop设置全局变量。停止所有线程的“死”循环
  2. 休眠3秒
  3. 停止worker心跳。heartbeatWorkerService.shutdownNow
  4. 停止worker任务线程池。ThreadPoolExecutors.getInstance().shutdown
  5. 停止killExecutor线程池。killExecutorService.shutdownNow
  6. 停止fetchTask线程池。fetchTaskExecutorService.shutdownNow
  7. 停止zookeeper客户端。zkWorkerClient.close

heartBeatThread不再分析,其逻辑与master基本一致,就是上报worker的当前资源使用情况。

ZKWorkerClient

最后我们再来看ZKWorkerClient的逻辑,它与worker的容灾有很大关系。这是一个非常重要的逻辑和概念,下面会逐步深入分析。

private ZKWorkerClient(){
	init();
}


private void init(){

	// init system znode
	this.initSystemZNode();

	// monitor worker
	this.listenerWorker();

	// register worker
	this.registWorker();
}

先来看其初始化过程,就是一次调用initSystemZNode、listenerWorker、registWorker。

protected void initSystemZNode(){
	try {
		createNodePath(getMasterZNodeParentPath());
		createNodePath(getWorkerZNodeParentPath());
		createNodePath(getDeadZNodeParentPath());

	} catch (Exception e) {
		logger.error("init system znode failed : " + e.getMessage(),e);
	}
}

private void createNodePath(String zNodeParentPath) throws Exception {
    if(null == zkClient.checkExists().forPath(zNodeParentPath)){
        zkClient.create().creatingParentContainersIfNeeded()
				.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
	}
}

根据initSystemZNode源码,以及涉及到的三个函数来看,就是在zookeeper中依次创建了3个节点。在worker节点初始化过程中还创建了master相关的子节点。

下面我们先分析registWorker

registWorker的源码不再贴出来,它主要就是调用registerServer(ZKNodeType.WORKER)注册了当前节点

public String registerServer(ZKNodeType zkNodeType) throws Exception {
	String registerPath = null;
	String host = OSUtils.getHost();
	if(checkZKNodeExists(host, zkNodeType)){
		logger.error("register failure , {} server already started on host : {}" ,
				zkNodeType.toString(), host);
		return registerPath;
	}
	registerPath = createZNodePath(zkNodeType);

    // handle dead server
	handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);

	return registerPath;
}

registerServer首先检查了当前节点是否存在,存在则退出;不存在则创建节点。最后调用handleDeadServer,其实就是查找死掉的节点,然后从zk中删除。

private void listenerWorker(){
	workerPathChildrenCache = new PathChildrenCache(zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory);
	try {
		workerPathChildrenCache.start();
		workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			@Override
			public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception {
				switch (event.getType()) {
					case CHILD_ADDED:
						logger.info("node added : {}" ,event.getData().getPath());
						break;
					case CHILD_REMOVED:
                        String path = event.getData().getPath();
						//find myself dead
						String serverHost = getHostByEventDataPath(path);
						if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
							return;
						}
						break;
					case CHILD_UPDATED:
						break;
					default:
						break;
				}
			}
		});
	}catch (Exception e){
		logger.error("monitor worker failed : " + e.getMessage(),e);
	}
}

listenerWorker就是监听worker的CHILD_REMOVED事件,监听到该事件之后,调用了checkServerSelfDead。worker本身并不会对其他worker节点的移除进行啥具体逻辑。

protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
	if (serverHost.equals(OSUtils.getHost())) {
		logger.error("{} server({}) of myself dead , stopping...",
				zkNodeType.toString(), serverHost);
		stoppable.stop(String.format(" {} server {} of myself dead , stopping...",
				zkNodeType.toString(), serverHost));
		return true;
	}
	return false;
}

checkServerSelfDead判断是否为当前节点,如果是则调用stoppable.stop,而stoppable是在WorkerServer.run函数中设置的。

zkWorkerClient.setStoppable(this);

listenerWorker就是监听当前节点是否超时被zookeeper删除,删除后则调用stop方法,退出。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663124.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号