继续分析DependentTask
allDependentTaskFinish是一个非常重要的逻辑。
private boolean allDependentTaskFinish(){
boolean finish = true;
for(DependentExecute dependentExecute : dependentTaskList){
for(Map.Entry entry: dependentExecute.getDependResultMap().entrySet()) {
if(!dependResultMap.containsKey(entry.getKey())){
dependResultMap.put(entry.getKey(), entry.getValue());
//save depend result to log
logger.info("dependent item complete {} {},{}",
DEPENDENT_SPLIT, entry.getKey(), entry.getValue().toString());
}
}
if(!dependentExecute.finish(dependentDate)){
finish = false;
}
}
return finish;
}
它遍历了dependentTaskList,通过dependentExecute.finish(dependentDate)判断了依赖的作业是否全部完成,任意一个没有完成,则退出循环,返回false。
dependentDate的值也很重要,它其实是任务的调度时间或者启动时间(补数时间)
if(taskProps.getScheduleTime() != null){
this.dependentDate = taskProps.getScheduleTime();
}else{
this.dependentDate = taskProps.getTaskStartTime();
}
通过一层层追踪分析DependentExecute.finish,我们定位到了DependentExecute.calculateResultForTasks,这是用来判断某个依赖项的依赖结果的。
private DependResult calculateResultForTasks(DependentItem dependentItem,
List dateIntervals) {
DependResult result = DependResult.FAILED;
for(DateInterval dateInterval : dateIntervals){
ProcessInstance processInstance = findLastProcessInterval(dependentItem.getDefinitionId(),
dateInterval);
if(processInstance == null){
logger.error("cannot find the right process instance: definition id:{}, start:{}, end:{}",
dependentItem.getDefinitionId(), dateInterval.getStartTime(), dateInterval.getEndTime() );
return DependResult.FAILED;
}
if(dependentItem.getDepTasks().equals(Constants.DEPENDENT_ALL)){
result = getDependResultByState(processInstance.getState());
}else{
TaskInstance taskInstance = null;
List taskInstanceList = processDao.findValidTaskListByProcessId(processInstance.getId());
for(TaskInstance task : taskInstanceList){
if(task.getName().equals(dependentItem.getDepTasks())){
taskInstance = task;
break;
}
}
if(taskInstance == null){
// cannot find task in the process instance
// maybe because process instance is running or failed.
result = getDependResultByState(processInstance.getState());
}else{
result = getDependResultByState(taskInstance.getState());
}
}
if(result != DependResult.SUCCESS){
break;
}
}
return result;
}
总结并简化其重要的逻辑,大概是如果依赖整个DAG,则判断流程定义实例的状态;否则依次判断依赖任务实例的状态。
DependentTask的逻辑简单清晰,就是循环等待所有的任务结束。但感觉这样设计不太好,毕竟把它当成一个普通的Task,就意味着它会占用整体的可调用的线程池。如果项目多、任务多、依赖也多的话,这个浪费还是有点大的。个人觉得DependentTask可以单独设计成一个线程,或者放到独立的线程池去运行。毕竟对于一个调度系统来说,“依赖”还是一个非常重要的概念的。
WorkerServer
最后我们分析WorkerServer,这是与master同级的类。与master分析思路一致,还是先来看stop方法。
此处补贴代码,只总结stop逻辑。
- 调用Stopper.stop设置全局变量。停止所有线程的“死”循环
- 休眠3秒
- 停止worker心跳。heartbeatWorkerService.shutdownNow
- 停止worker任务线程池。ThreadPoolExecutors.getInstance().shutdown
- 停止killExecutor线程池。killExecutorService.shutdownNow
- 停止fetchTask线程池。fetchTaskExecutorService.shutdownNow
- 停止zookeeper客户端。zkWorkerClient.close
heartBeatThread不再分析,其逻辑与master基本一致,就是上报worker的当前资源使用情况。
ZKWorkerClient
最后我们再来看ZKWorkerClient的逻辑,它与worker的容灾有很大关系。这是一个非常重要的逻辑和概念,下面会逐步深入分析。
private ZKWorkerClient(){
init();
}
private void init(){
// init system znode
this.initSystemZNode();
// monitor worker
this.listenerWorker();
// register worker
this.registWorker();
}
先来看其初始化过程,就是一次调用initSystemZNode、listenerWorker、registWorker。
protected void initSystemZNode(){
try {
createNodePath(getMasterZNodeParentPath());
createNodePath(getWorkerZNodeParentPath());
createNodePath(getDeadZNodeParentPath());
} catch (Exception e) {
logger.error("init system znode failed : " + e.getMessage(),e);
}
}
private void createNodePath(String zNodeParentPath) throws Exception {
if(null == zkClient.checkExists().forPath(zNodeParentPath)){
zkClient.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).forPath(zNodeParentPath);
}
}
根据initSystemZNode源码,以及涉及到的三个函数来看,就是在zookeeper中依次创建了3个节点。在worker节点初始化过程中还创建了master相关的子节点。
下面我们先分析registWorker
registWorker的源码不再贴出来,它主要就是调用registerServer(ZKNodeType.WORKER)注册了当前节点
public String registerServer(ZKNodeType zkNodeType) throws Exception {
String registerPath = null;
String host = OSUtils.getHost();
if(checkZKNodeExists(host, zkNodeType)){
logger.error("register failure , {} server already started on host : {}" ,
zkNodeType.toString(), host);
return registerPath;
}
registerPath = createZNodePath(zkNodeType);
// handle dead server
handleDeadServer(registerPath, zkNodeType, Constants.DELETE_ZK_OP);
return registerPath;
}
registerServer首先检查了当前节点是否存在,存在则退出;不存在则创建节点。最后调用handleDeadServer,其实就是查找死掉的节点,然后从zk中删除。
private void listenerWorker(){
workerPathChildrenCache = new PathChildrenCache(zkClient, getZNodeParentPath(ZKNodeType.WORKER), true, defaultThreadFactory);
try {
workerPathChildrenCache.start();
workerPathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(Curatorframework client, PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
logger.info("node added : {}" ,event.getData().getPath());
break;
case CHILD_REMOVED:
String path = event.getData().getPath();
//find myself dead
String serverHost = getHostByEventDataPath(path);
if(checkServerSelfDead(serverHost, ZKNodeType.WORKER)){
return;
}
break;
case CHILD_UPDATED:
break;
default:
break;
}
}
});
}catch (Exception e){
logger.error("monitor worker failed : " + e.getMessage(),e);
}
}
listenerWorker就是监听worker的CHILD_REMOVED事件,监听到该事件之后,调用了checkServerSelfDead。worker本身并不会对其他worker节点的移除进行啥具体逻辑。
protected boolean checkServerSelfDead(String serverHost, ZKNodeType zkNodeType) {
if (serverHost.equals(OSUtils.getHost())) {
logger.error("{} server({}) of myself dead , stopping...",
zkNodeType.toString(), serverHost);
stoppable.stop(String.format(" {} server {} of myself dead , stopping...",
zkNodeType.toString(), serverHost));
return true;
}
return false;
}
checkServerSelfDead判断是否为当前节点,如果是则调用stoppable.stop,而stoppable是在WorkerServer.run函数中设置的。
zkWorkerClient.setStoppable(this);
listenerWorker就是监听当前节点是否超时被zookeeper删除,删除后则调用stop方法,退出。



