栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

dolphinscheduler 2.0.5 Master服务梳理

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

dolphinscheduler 2.0.5 Master服务梳理

Master服务梳理

MasterServer

main() 启动masterserver服务run() 执行线程体

初始化netty远程服务器

romote serverclient server 注册zookeeper,启动自我容错机制启动调度程序启动Quartz执行器及异常处理 close()stop() 总图

MasterServer

main() 启动masterserver服务

MasterServer纯后台服务,不对外提供服务,因此不以web启动。SpringApplicationBuilder这种启动方式可以把多个方法串联起来,使用更加方便。

new SpringApplicationBuilder(MasterServer.class).web(WebApplicationType.NONE).run(args);

run() 执行线程体

● 初始化netty远程服务器
● 注册zookeeper,启动自我容错机制
● 启动调度程序
● 启动Quartz执行器及异常处理

   @EventListener
    public void run(ApplicationReadyEvent ignored) {
        PropertyUtils.setValue(SPRING_DATASOURCE_DRIVER_CLASS_NAME, driverClassName);

        // init remoting server
        NettyServerConfig serverConfig = new NettyServerConfig();
        serverConfig.setListenPort(masterConfig.getListenPort());
        this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
        TaskAckProcessor ackProcessor = new TaskAckProcessor();
        ackProcessor.init(processInstanceExecMaps);
        TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor();
        taskResponseProcessor.init(processInstanceExecMaps);
        TaskKillResponseProcessor taskKillResponseProcessor = new TaskKillResponseProcessor();
        taskKillResponseProcessor.init(processInstanceExecMaps);
        StateEventProcessor stateEventProcessor = new StateEventProcessor();
        stateEventProcessor.init(processInstanceExecMaps);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
        this.nettyRemotingServer.registerProcessor(CommandType.CACHE_EXPIRE, new CacheProcessor());
        this.nettyRemotingServer.start();

        // self tolerant
        this.masterRegistryClient.init(this.processInstanceExecMaps);
        this.masterRegistryClient.setRegistryStoppable(this);
        this.masterRegistryClient.start();

        this.eventExecuteService.init(this.processInstanceExecMaps);
        this.eventExecuteService.start();
        // scheduler start
        this.masterSchedulerService.init(this.processInstanceExecMaps);

        this.masterSchedulerService.start();

        this.failoverExecuteThread.start();

        // start QuartzExecutors
        // what system should do if exception
        try {
            logger.info("start Quartz server...");
            QuartzExecutors.getInstance().start();
        } catch (Exception e) {
            try {
                QuartzExecutors.getInstance().shutdown();
            } catch (SchedulerException e1) {
                logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
            }
            logger.error("start Quartz failed", e);
        }

        
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            if (Stopper.isRunning()) {
                close("shutdownHook");
            }
        }));

    }
初始化netty远程服务器 romote server

服务配置

主服务

处理类

channel注册、取消、读取、更改、状态检查、异常捕获等

client server

NettyRemotingClient:netty处理客户端(master、worker、alert)

处理类

依赖关系

注册zookeeper,启动自我容错机制

通过MasterRegistryClient完成注册

        this.masterRegistryClient.init(this.processInstanceExecMaps);
        this.masterRegistryClient.setRegistryStoppable(this);
        this.masterRegistryClient.start();

启动调度程序

通过MasterSchedulerService启动master调度服务

        this.masterSchedulerService.init(this.processInstanceExecMaps);
        this.masterSchedulerService.start();

启动Quartz执行器及异常处理

QuartzExecutors

 try {
            logger.info("start Quartz server...");
            QuartzExecutors.getInstance().start();
        } catch (Exception e) {
            try {
                QuartzExecutors.getInstance().shutdown();
            } catch (SchedulerException e1) {
                logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
            }
            logger.error("start Quartz failed", e);
        }

close()

对应启动、运行方法,运行起来的服务都要注销或者关掉
猜测该方法应该是异常情况,比如资源不足等自动触发,因为执行脚本直接就是kill -9 来关闭服务。欢迎指正!

public void close(String cause) {

        try {
            // execute only once
            if (Stopper.isStopped()) {
                return;
            }

            logger.info("master server is stopping ..., cause : {}", cause);

            // set stop signal is true
            Stopper.stop();

            try {
                // thread sleep 3 seconds for thread quietly stop
                Thread.sleep(3000L);
            } catch (Exception e) {
                logger.warn("thread sleep exception ", e);
            }
            // close
            this.masterSchedulerService.close();
            this.nettyRemotingServer.close();
            this.masterRegistryClient.closeRegistry();
            // close quartz
            try {
                QuartzExecutors.getInstance().shutdown();
                logger.info("Quartz service stopped");
            } catch (Exception e) {
                logger.warn("Quartz service stopped exception:{}", e.getMessage());
            }
            // close spring Context and will invoke method with @PreDestroy annotation to destory beans. like ServerNodeManager,HostManager,TaskResponseService,CuratorZookeeperClient,etc
            springApplicationContext.close();
            logger.info("springApplicationContext close");
            try {
                // thread sleep 60 seconds for quietly stop
                Thread.sleep(60000L);
            } catch (Exception e) {
                logger.warn("thread sleep exception ", e);
            }
            // Since close will be executed in hook, so we can't use System.exit here.
            Runtime.getRuntime().halt(0);
        } catch (Exception e) {
            logger.error("master server stop exception ", e);
            Runtime.getRuntime().halt(1);
        }
    }
stop()

调用close()方法

  @Override
    public void stop(String cause) {
        close(cause);
    }
总图

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781453.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号