栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021-11-1---2021SC@SDUSC---DolphinScheduler(6)

2021-11-1---2021SC@SDUSC---DolphinScheduler(6)

2021-11-1
2021SC@SDUSC

DolphinScheduler(6) MasterTaskExecThread
根据其定义,我们知道MasterTaskExecThread继承了MasterbaseTaskExecThread,且构造函数简单的调用了父类的构造函数。

public class MasterTaskExecThread extends MasterbaseTaskExecThread

MasterbaseTaskExecThread的构造函数也比较简单,给几个关键的字段赋初始值。

public MasterbaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
    this.processDao = BeanContext.getBean(ProcessDao.class);
    this.alertDao = BeanContext.getBean(alertDao.class);
    this.processInstance = processInstance;
    this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
    this.cancel = false;
    this.taskInstance = taskInstance;
}

1

MasterbaseTaskExecThread实现了Callable接口,
call方法又调用了submitWaitComplete,
MasterTaskExecThread类中对改方法进行了覆盖。
submitWaitComplete根据名称及其注释说明可以知道,它提交了一个任务实例,
然后等待其完成。

@Override
public Boolean submitWaitComplete() {
    Boolean result = false;
    this.taskInstance = submit();
    if(!this.taskInstance.getState().typeIsFinished()) {
        result = waitTaskQuit();
    }
    taskInstance.setEndTime(new Date());
    processDao.updateTaskInstance(taskInstance);
    logger.info("task :{} id:{}, process id:{}, exec thread completed ",
            this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
    return result;
}

该函数的逻辑简单来说就是,提交一个任务实例,等待任务完成,更新任务结束时间到数据。
2

processDao.submitTask

首先来看看作业是如何提交的,好像也比较简单,就是调用了processDao.submitTask。

protected TaskInstance submit(){
    Integer commitRetryTimes = conf.getInt(Constants.MASTER_COMMIT_RETRY_TIMES,
            Constants.defaultMasterCommitRetryTimes);
    Integer commitRetryInterval = conf.getInt(Constants.MASTER_COMMIT_RETRY_INTERVAL,
            Constants.defaultMasterCommitRetryInterval);

    int retryTimes = 1;

    while (retryTimes <= commitRetryTimes){
        try {
            TaskInstance task = processDao.submitTask(taskInstance, processInstance);
            if(task != null){
                return task;
            }
            logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes);
            Thread.sleep(commitRetryInterval);
        } catch (Exception e) {
            logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);
        }
        retryTimes += 1;
    }
    return null;
}

根据前面的分析我们知道processDao就是跟数据库打交道的,这里就是把任务实例插入到了数据。3

@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
    logger.info("start submit task : {}, instance id:{}, state: {}, ",
            taskInstance.getName(), processInstance.getId(), processInstance.getState() );
    processInstance = this.findProcessInstanceDetailById(processInstance.getId());
    //submit to mysql
    TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
    if(task.isSubProcess() && !task.getState().typeIsFinished()){
        ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);

        TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
        Map subProcessParam = JSONUtils.toMap(taskNode.getParams());
        Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
        createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
    }else if(!task.getState().typeIsFinished()){
        //submit to task queue
        task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
        submitTaskToQueue(task);
    }
    logger.info("submit task :{} state:{} complete, instance id:{} state: {}  ",
            taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
    return task;
}


看其主干逻辑为
调用submitTaskInstanceToMysql把任务实例插入到数据库,
然后调用submitTaskToQueue。

submitTaskInstanceToMysql不再分析,与函数名差不多,
就是把instance插入到Mysql数据库。

submitTaskToQueue

submitTaskToQueue主干逻辑就是把taskInstance添加到了TaskQueue。

public Boolean submitTaskToQueue(TaskInstance taskInstance) {

    try{
        // task cannot submit when running
        if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
            logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));
            return true;
        }
        if(checkTaskExistsInTaskQueue(taskInstance)){
            logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName()));
            return true;
        }
        logger.info("task ready to queue: {}" , taskInstance);
        taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
        logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );
        return true;
    }catch (Exception e){
        logger.error("submit task to queue Exception: ", e);
        logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));
        return false;

    }
}

taskQueue是TaskQueueFactory.getTaskQueueInstance()创建的。

protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();

getTaskQueueInstance其实就是调用了TaskQueueZkImpl.getInstance(),这应该是一个遗留接口。估计设计初期是想根据配置创建不同的任务队列,比如redis或者其他,目前只支持zookeeper。

public static ITaskQueue getTaskQueueInstance() {
  String queueImplValue = CommonUtils.getQueueImplValue();
  if (StringUtils.isNotBlank(queueImplValue)) {
      logger.info("task queue impl use zookeeper ");
      return TaskQueueZkImpl.getInstance();
  }else{
    logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
    System.exit(-1);
  }

  return null;
}

这样来看submitTaskToQueue就是调用TaskQueueZkImpl.add方法,
把任务实例插入到了zookeeper实现的队列中。

4


  1. 但processDao、alertDao居然是通过BeanContext.getBean获取到的!!!个人感觉这是一个非常恶心的设计。一个优秀的设计,应该是类的创建者负责子类的参数及其功能的边界。BeanContext.getBean扩展了所有类与SpringBoot的ApplicationContext间接打交道的能力,而且无法控制,因为只要调用BeanContext.getBean都可以获取到对应的bean进行操作。 ↩︎

  2. 我们可以看出,每个任务实例都可以更新数据库,加上其他线程,对数据库的压力可能很大。如果任务非常多,并发非常大的情况下,jdbc连接线程池需要适当调大。否则,数据库会成为系统瓶颈。如果worker节点个数过多,这种压力又会几何倍数的增长。 ↩︎

  3. 难道是这样? ↩︎

  4. 既然只支持zookeeper,这段冗余代码应该删除的 ↩︎

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582619.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号