栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC(dolphinscheduler- common4)

2021SC@SDUSC(dolphinscheduler- common4)

下面就分析一下master的启动过程。

MasterServer

先看MasterServer源码概览,它是一个SpringBoot普通应用,可以有Autowired字段。有三个主要的方法:run/stop/heartBeatThread。根据经验和注释大胆猜测一下,run是master的主要启动逻辑;stop负责优雅退出(销毁资源、容灾等);heartBeatThread负责与zk的心跳。

先从非主干逻辑分析,那就是heartBeatThread。

private Runnable heartBeatThread(){
    Runnable heartBeatThread  = new Runnable() {
        @Override
        public void run() {
            if(Stopper.isRunning()) {
                // send heartbeat to zk
                if (StringUtils.isBlank(zkMasterClient.getMasterZNode())) {
                    logger.error("master send heartbeat to zk failed: can't find zookeeper path of master server");
                    return;
                }

                zkMasterClient.heartBeatForZk(zkMasterClient.getMasterZNode(), Constants.MASTER_PREFIX);
            }
        }
    };
    return heartBeatThread;
}

 heartBeatThread创建了一个线程,该线程就是调用了zkMasterClient.heartBeatForZk。

public void heartBeatForZk(String znode, String serverType){
	try {

		//check dead or not in zookeeper
		if(zkClient.getState() == CuratorframeworkState.STOPPED || checkIsDeadServer(znode, serverType)){
			stoppable.stop("i was judged to death, release resources and stop myself");
			return;
		}

		byte[] bytes = zkClient.getData().forPath(znode);
		String resInfoStr = new String(bytes);
		String[] splits = resInfoStr.split(Constants.COMMA);
		if (splits.length != Constants.HEARTBEAT_FOR_ZOOKEEPER_INFO_LENGTH){
			return;
		}
		String str = splits[0] + Constants.COMMA
				+ splits[1] + Constants.COMMA
				+ OSUtils.cpuUsage() + Constants.COMMA
				+ OSUtils.memoryUsage() + Constants.COMMA
				+ OSUtils.loadAverage() + Constants.COMMA
				+ splits[5] + Constants.COMMA
				+ DateUtils.dateToString(new Date());
		zkClient.setData().forPath(znode,str.getBytes());

	} catch (Exception e) {
		logger.error("heartbeat for zk failed : " + e.getMessage(), e);
		stoppable.stop("heartbeat for zk exception, release resources and stop myself");
	}
}

zkMasterClient.heartBeatForZk就是在master对应的zookeeper目录下,更新data值,data主要包含当前系统的资源信息:CPU、内存、平均负载。还有最后一次更新的时间。

public synchronized void stop(String cause) {

    try {
        //execute only once
        if(Stopper.isStoped()){
            return;
        }

        logger.info("master server is stopping ..., cause : {}", cause);

        // set stop signal is true
        Stopper.stop();

        try {
            //thread sleep 3 seconds for thread quitely stop
            Thread.sleep(3000L);
        }catch (Exception e){
            logger.warn("thread sleep exception:" + e.getMessage(), e);
        }
        try {
            heartbeatMasterService.shutdownNow();
        }catch (Exception e){
            logger.warn("heartbeat service stopped exception");
        }

        logger.info("heartbeat service stopped");

        //close quartz
        try{
            QuartzExecutors.getInstance().shutdown();
        }catch (Exception e){
            logger.warn("Quartz service stopped exception:{}",e.getMessage());
        }

        logger.info("Quartz service stopped");

        try {
            ThreadPoolExecutors.getInstance().shutdown();
        }catch (Exception e){
            logger.warn("threadpool service stopped exception:{}",e.getMessage());
        }

        logger.info("threadpool service stopped");

        try {
            masterSchedulerService.shutdownNow();
        }catch (Exception e){
            logger.warn("master scheduler service stopped exception:{}",e.getMessage());
        }

        logger.info("master scheduler service stopped");

        try {
            zkMasterClient.close();
        }catch (Exception e){
            logger.warn("zookeeper service stopped exception:{}",e.getMessage());
        }

        logger.info("zookeeper service stopped");


    } catch (Exception e) {
        logger.error("master server stop exception : " + e.getMessage(), e);
        System.exit(-1);
    }
}

来看stop,它是一个同步方法(synchronized)。为了线程安全,这一点还是比较谨慎的。还会调用Stopper.isStoped(),以便只能执行一次。

后面的逻辑就比较简单了,总结如下:

  1. Stopper.stop()。关闭全部线程的循环标志
  2. 休眠3秒
  3. heartbeatMasterService.shutdownNow
  4. QuartzExecutors.getInstance().shutdown
  5. ThreadPoolExecutors.getInstance().shutdown
  6. masterSchedulerService.shutdownNow
  7. zkMasterClient.close

读者要细心的分析shutdownNow和shutdown的区别。一些重要的线程还是要等待其全部执行完才能退出的,比如ThreadPoolExecutors。

但上面退出的顺序就值得商榷了。假如ThreadPoolExecutors等了很久才退出,就会造成zkMasterClient退出时间也非常久。现在还不知道其他master节点是怎么进行容灾的。假如通过HeartBeat,此时heartBeat已经停止了,应该容灾,但任务线程池还在执行,其他节点又重复启动了流程定义实例是否会有影响呢?如果通过zookeeper心跳,此时任务也没有结束,zookeeper还在连接,貌似也没法容灾吧。

从上面的分析来看,在各个while循环处理Stopper.isRunning()时,并没有做相应的退出动作,所以此处的stop并不优雅。不是说master优雅退出了,其他节点就是优雅的退出。

考虑到run源码比较长,且都是一些线程初始化、提交的逻辑,下面只分析最后一段代码。

// start QuartzExecutors
// what system should do if exception
try {
    ProcessScheduleJob.init(processDao);
    QuartzExecutors.getInstance().start();
} catch (Exception e) {
    try {
        QuartzExecutors.getInstance().shutdown();
    } catch (SchedulerException e1) {
        logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
    }
    logger.error("start Quartz failed : " + e.getMessage(), e);
}



Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    @Override
    public void run() {
        if (zkMasterClient.getActiveMasterNum() <= 1) {
            for (int i = 0; i < Constants.DOLPHINSCHEDULER_WARN_TIMES_FAILOVER; i++) {
                zkMasterClient.getalertDao().sendServerStopedalert(
                        1, OSUtils.getHost(), "Master-Server");
            }
        }
        stop("shutdownhook");
    }
}));

ProcessScheduleJob.init就是给ProcessScheduleJob一个static字段赋值,也就是给所有的ProcessScheduleJob一个全局的processDao

public static void init(ProcessDao processDao) {
    ProcessScheduleJob.processDao = processDao;
}

感觉源码中关于processDao的处理有点模糊不清,比较随意。有些是传参,有些是getBean,有些又是全局变量。

addShutdownHook的逻辑就比较清晰了,就是添加了进程退出的hook。先发送预警信息,然后调用stop“优雅”退出。

FetchTaskThread

worker涉及的线程主要有两个FetchTaskThread、TaskScheduleThread。

FetchTaskThread从名称上来看,应该是从zk队列拉取任务信息的。它也是Runnable的一个实现类,还是从run方法入手分析。

public void run() {
    while (Stopper.isRunning()){
        InterProcessMutex mutex = null;
        try {
            ThreadPoolExecutor poolExecutor = (ThreadPoolExecutor) workerExecService;
            //check memory and cpu usage and threads
            boolean runCheckFlag = OSUtils.checkResource(this.conf, false) && checkThreadCount(poolExecutor);

            Thread.sleep(Constants.SLEEP_TIME_MILLIS);

            if(!runCheckFlag) {
                continue;
            }

            //whether have tasks, if no tasks , no need lock  //get all tasks
            List tasksQueueList = taskQueue.getAllTasks(Constants.DOLPHINSCHEDULER_TASKS_QUEUE);
            if (CollectionUtils.isEmpty(tasksQueueList)){
                continue;
            }
            // creating distributed locks, lock path /dolphinscheduler/lock/worker
            mutex = zkWorkerClient.acquireZkLock(zkWorkerClient.getZkClient(),
                    zkWorkerClient.getWorkerLockPath());


            // task instance id str
            List taskQueueStrArr = taskQueue.poll(Constants.DOLPHINSCHEDULER_TASKS_QUEUE, taskNum);

            for(String taskQueueStr : taskQueueStrArr){
                if (StringUtils.isEmpty(taskQueueStr)) {
                    continue;
                }

                if (!checkThreadCount(poolExecutor)) {
                    break;
                }

                // get task instance id
                taskInstId = getTaskInstanceId(taskQueueStr);

                // mainly to wait for the master insert task to succeed
                waitForTaskInstance();

                taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstId);

                // verify task instance is null
                if (verifyTaskInstanceIsNull(taskInstance)) {
                    logger.warn("remove task queue : {} due to taskInstance is null", taskQueueStr);
                    removeNodeFromTaskQueue(taskQueueStr);
                    continue;
                }

                Tenant tenant = processDao.getTenantForProcess(taskInstance.getProcessInstance().getTenantId(),
                        taskInstance.getProcessDefine().getUserId());

                // verify tenant is null
                if (verifyTenantIsNull(tenant)) {
                    logger.warn("remove task queue : {} due to tenant is null", taskQueueStr);
                    removeNodeFromTaskQueue(taskQueueStr);
                    continue;
                }

                // set queue for process instance, user-specified queue takes precedence over tenant queue
                String userQueue = processDao.queryUserQueueByProcessInstanceId(taskInstance.getProcessInstanceId());
                taskInstance.getProcessInstance().setQueue(StringUtils.isEmpty(userQueue) ? tenant.getQueue() : userQueue);
                taskInstance.getProcessInstance().setTenantCode(tenant.getTenantCode());

                logger.info("worker fetch taskId : {} from queue ", taskInstId);


                if(!checkWorkerGroup(taskInstance, OSUtils.getHost())){
                    continue;
                }

                // local execute path
                String execLocalPath = getExecLocalPath();

                logger.info("task instance  local execute path : {} ", execLocalPath);

                // init task
                taskInstance.init(OSUtils.getHost(),
                        new Date(),
                        execLocalPath);

                // check and create Linux users
                FileUtils.createWorkDirAndUserIfAbsent(execLocalPath,
                        tenant.getTenantCode(), logger);

                logger.info("task : {} ready to submit to task scheduler thread",taskInstId);
                // submit task
                workerExecService.submit(new TaskScheduleThread(taskInstance, processDao));

                // remove node from zk
                removeNodeFromTaskQueue(taskQueueStr);
            }

        }catch (Exception e){
            logger.error("fetch task thread failure" ,e);
        }finally {
            AbstractZKClient.releaseMutex(mutex);
        }
    }
}

run还是一个while“死循环”,首先检查了当前资源是否超阈值、线程数是否够用,然后休眠1秒,判断前面的结果,为false则进入下一个循环。

调用taskQueue.getAllTasks获取当前所有的任务列表,为空则进入下一次循环。

申请InterProcessMutex锁,这样同一时刻只有一个worker节点可以从队列中poll任务。这意味着,任务会随机的在worker节点执行。抢占到锁之后,每次poll固定数量的任务。

获取到任务列表之后,就是一个for循环,依次处理任务。下面简单总结一下其逻辑。

  1. 判断taskQueueStr是否为空。感觉有点多此一举。
  2. 判断当前线程数是否够用
  3. 从taskQueueStr中取到任务ID。就是按照_分隔之后的第四个字段。
  4. 等待任务实例信息插入到数据库。循环30次,每次等待1秒。注释说数据库操作会被延迟,不知道哪里会延迟。
  5. 通过任务id,获取任务实例信息。
  6. 通过任务实例,获取租户信息。
  7. 通过任务实例,获取用户队列信息。为啥不在查询任务实例信息的时候,直接获取到呢?或者在getTaskInstanceDetailByTaskId一次性获取到?
  8. 判断任务实例是否可以在当前节点执行,不能则继续下一个任务处理。这为啥不提前判断呢?调了2次db查询才来判断?
  9. 任务实例初始化
  10. 检查目录、用户是否存在。不存在则创建用户、目录。为啥不是提前建好?每次还要检查一遍。
  11. 提交任务,交给TaskScheduleThread线程执行。
  12. 删除taskQueue中对应的任务节点。

FetchTaskThread功能就是抢占zk锁,从TaskQueue获取任务,然后创建TaskScheduleThread线程去执行。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629758.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号