2021-11-1
2021SC@SDUSC
根据其定义,我们知道MasterTaskExecThread继承了MasterbaseTaskExecThread,且构造函数简单的调用了父类的构造函数。 public class MasterTaskExecThread extends MasterbaseTaskExecThread MasterbaseTaskExecThread的构造函数也比较简单,给几个关键的字段赋初始值。
public MasterbaseTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
this.processDao = BeanContext.getBean(ProcessDao.class);
this.alertDao = BeanContext.getBean(alertDao.class);
this.processInstance = processInstance;
this.taskQueue = TaskQueueFactory.getTaskQueueInstance();
this.cancel = false;
this.taskInstance = taskInstance;
}
1
MasterbaseTaskExecThread实现了Callable接口, call方法又调用了submitWaitComplete, MasterTaskExecThread类中对改方法进行了覆盖。 submitWaitComplete根据名称及其注释说明可以知道,它提交了一个任务实例, 然后等待其完成。
@Override
public Boolean submitWaitComplete() {
Boolean result = false;
this.taskInstance = submit();
if(!this.taskInstance.getState().typeIsFinished()) {
result = waitTaskQuit();
}
taskInstance.setEndTime(new Date());
processDao.updateTaskInstance(taskInstance);
logger.info("task :{} id:{}, process id:{}, exec thread completed ",
this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );
return result;
}
该函数的逻辑简单来说就是,提交一个任务实例,等待任务完成,更新任务结束时间到数据。
2
首先来看看作业是如何提交的,好像也比较简单,就是调用了processDao.submitTask。
protected TaskInstance submit(){
Integer commitRetryTimes = conf.getInt(Constants.MASTER_COMMIT_RETRY_TIMES,
Constants.defaultMasterCommitRetryTimes);
Integer commitRetryInterval = conf.getInt(Constants.MASTER_COMMIT_RETRY_INTERVAL,
Constants.defaultMasterCommitRetryInterval);
int retryTimes = 1;
while (retryTimes <= commitRetryTimes){
try {
TaskInstance task = processDao.submitTask(taskInstance, processInstance);
if(task != null){
return task;
}
logger.error("task commit to mysql and queue failed , task has already retry {} times, please check the database", commitRetryTimes);
Thread.sleep(commitRetryInterval);
} catch (Exception e) {
logger.error("task commit to mysql and queue failed : " + e.getMessage(),e);
}
retryTimes += 1;
}
return null;
}
根据前面的分析我们知道processDao就是跟数据库打交道的,这里就是把任务实例插入到了数据。3
@Transactional(rollbackFor = Exception.class)
public TaskInstance submitTask(TaskInstance taskInstance, ProcessInstance processInstance){
logger.info("start submit task : {}, instance id:{}, state: {}, ",
taskInstance.getName(), processInstance.getId(), processInstance.getState() );
processInstance = this.findProcessInstanceDetailById(processInstance.getId());
//submit to mysql
TaskInstance task= submitTaskInstanceToMysql(taskInstance, processInstance);
if(task.isSubProcess() && !task.getState().typeIsFinished()){
ProcessInstanceMap processInstanceMap = setProcessInstanceMap(processInstance, task);
TaskNode taskNode = JSONUtils.parseObject(task.getTaskJson(), TaskNode.class);
Map subProcessParam = JSONUtils.toMap(taskNode.getParams());
Integer defineId = Integer.parseInt(subProcessParam.get(Constants.CMDPARAM_SUB_PROCESS_DEFINE_ID));
createSubWorkProcessCommand(processInstance, processInstanceMap, defineId, task);
}else if(!task.getState().typeIsFinished()){
//submit to task queue
task.setProcessInstancePriority(processInstance.getProcessInstancePriority());
submitTaskToQueue(task);
}
logger.info("submit task :{} state:{} complete, instance id:{} state: {} ",
taskInstance.getName(), task.getState(), processInstance.getId(), processInstance.getState());
return task;
}
看其主干逻辑为
调用submitTaskInstanceToMysql把任务实例插入到数据库,
然后调用submitTaskToQueue。
submitTaskInstanceToMysql不再分析,与函数名差不多,
就是把instance插入到Mysql数据库。
submitTaskToQueue主干逻辑就是把taskInstance添加到了TaskQueue。
public Boolean submitTaskToQueue(TaskInstance taskInstance) {
try{
// task cannot submit when running
if(taskInstance.getState() == ExecutionStatus.RUNNING_EXEUTION){
logger.info(String.format("submit to task queue, but task [%s] state already be running. ", taskInstance.getName()));
return true;
}
if(checkTaskExistsInTaskQueue(taskInstance)){
logger.info(String.format("submit to task queue, but task [%s] already exists in the queue.", taskInstance.getName()));
return true;
}
logger.info("task ready to queue: {}" , taskInstance);
taskQueue.add(DOLPHINSCHEDULER_TASKS_QUEUE, taskZkInfo(taskInstance));
logger.info(String.format("master insert into queue success, task : %s", taskInstance.getName()) );
return true;
}catch (Exception e){
logger.error("submit task to queue Exception: ", e);
logger.error("task queue error : %s", JSONUtils.toJson(taskInstance));
return false;
}
}
taskQueue是TaskQueueFactory.getTaskQueueInstance()创建的。
protected ITaskQueue taskQueue = TaskQueueFactory.getTaskQueueInstance();
getTaskQueueInstance其实就是调用了TaskQueueZkImpl.getInstance(),这应该是一个遗留接口。估计设计初期是想根据配置创建不同的任务队列,比如redis或者其他,目前只支持zookeeper。
public static ITaskQueue getTaskQueueInstance() {
String queueImplValue = CommonUtils.getQueueImplValue();
if (StringUtils.isNotBlank(queueImplValue)) {
logger.info("task queue impl use zookeeper ");
return TaskQueueZkImpl.getInstance();
}else{
logger.error("property dolphinscheduler.queue.impl can't be blank, system will exit ");
System.exit(-1);
}
return null;
}
这样来看submitTaskToQueue就是调用TaskQueueZkImpl.add方法, 把任务实例插入到了zookeeper实现的队列中。
4
但processDao、alertDao居然是通过BeanContext.getBean获取到的!!!个人感觉这是一个非常恶心的设计。一个优秀的设计,应该是类的创建者负责子类的参数及其功能的边界。BeanContext.getBean扩展了所有类与SpringBoot的ApplicationContext间接打交道的能力,而且无法控制,因为只要调用BeanContext.getBean都可以获取到对应的bean进行操作。 ↩︎
我们可以看出,每个任务实例都可以更新数据库,加上其他线程,对数据库的压力可能很大。如果任务非常多,并发非常大的情况下,jdbc连接线程池需要适当调大。否则,数据库会成为系统瓶颈。如果worker节点个数过多,这种压力又会几何倍数的增长。 ↩︎
难道是这样? ↩︎
既然只支持zookeeper,这段冗余代码应该删除的 ↩︎



