栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

datalink work启动过程

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

datalink work启动过程

datalink work启动过程

一、启动位点管理

1、初始化位点管理缓存2、启动定时任务把内存位点信息刷到数据库或者ZK 二、启动探针管理三、初始化:刷新任务和版本(任务配置)四、任务状态管理(?)五、启动Worker keeper 监控管理六、task 启动过程

一、启动位点管理 1、初始化位点管理缓存

在运行过程中位点是存储在内存中的,初始化时从数据库或者ZK中取出来。

 this.updatedPositions = CacheBuilder.newBuilder().build(new CacheLoader() {
            @Override
            public Position load(String taskId) throws Exception {
                //从数据库或者ZK中把位点信息拉到缓存中
                Position position = taskPositionService.getPosition(taskId);
                if (position == null) {
                    return nullPosition;
                } else {
                    return position;
                }
            }
        });
2、启动定时任务把内存位点信息刷到数据库或者ZK

TaskPositionManager类中启动定时任务,按每秒的频率把位点信息刷新到数据库或者ZK(按BootMode配置来决定)

        // 启动定时工作任务
        this.executor.scheduleAtFixedRate(() -> {
            synchronized (TaskPositionManager.this) {
                List tasks = new ArrayList<>(updatePositionTasks);
                for (String taskId : tasks) {
                    // 定时将内存中的最新值持久化到存储,多次变更只刷一次
                    flush(taskId);
                }
            }
        }, period, period, TimeUnit.MILLISECONDS);
二、启动探针管理

包括任务延时监控、任务统计、任务异常、workjvm监控 、work 系统监控,都是采用定时任务把采集到的信息保存在数据库中。

    ProbeManager.java
    public void start(List probeBlackList) {
        //init
        initTaskDelayProbe(probeBlackList);
        initTaskExceptionProbe(probeBlackList);
        initTaskStatisticProbe(probeBlackList);
        initWorkerJvmStateProbe(probeBlackList);
        initWorkerSystemStateProbe(probeBlackList);

        //start
        taskDelayProbe.start();
        taskStatisticProbe.start();
        taskExceptionProbe.start();
        workerJvmStateProbe.start();
        workerSystemStateProbe.start();

        logger.info("Probe manager started.");
    }
TaskDelayProbeImpl.java

@Override
    public void start() {
        if (running) {
            return;
        }

        this.taskMonitorExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory(
                MessageFormat.format("Probe-Type-{0}", TaskDelayProbe.class.getSimpleName())));
        //启动定时监控任务
        taskMonitorExecutor.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    sendDelayReset();
                } catch (Throwable e) {
                    logger.error("Task delay probe failed.", e);
                }
            }
        }, period, period, TimeUnit.MILLISECONDS);

        logger.info("Task delay probe started.");

        this.running = true;
    }
三、初始化:刷新任务和版本(任务配置)
    TaskConfigManager.java

    public void start() {
        logger.info("Starting TaskConfigManager");
        forceRefresh();
        logger.info("Started TaskConfigManager");
    }

 @Override
    public List getActiveTaskConfigsByGroup(Long groupId) {
        Assert.notNull(groupId);

        long start = System.currentTimeMillis();
        List taskInfos = taskDAO.getListWithDeleted();
        logger.info("Time for querying task from db  is : " + (System.currentTimeMillis() - start) + "ms");

        if (taskInfos == null) {
            return Lists.newArrayList();
        } else {
            OptionalLong ol = taskInfos.stream().mapToLong(t -> t.getModifyTimeMillSeconds()).max();
            long version = ol.isPresent() ? ol.getAsLong() : -1L;
            taskInfos.forEach(t -> t.setVersion(version));
            return taskInfos.stream().filter(t -> t.getGroupId().equals(groupId) && !t.isDelete()).collect(Collectors.toList());
        }
    }
四、任务状态管理(?)
    TaskStatusManager.java
    public synchronized void start() {
        DlinkZkUtils.get()
                .zkClient()
                .subscribeStateChanges(zkStateListener);
    }
五、启动Worker keeper 监控管理
  WorkerKeeper.java
 @Override
    public void run() {
        try {
            log.info("Worker Keeper starting");

            startServices();

            log.info("Worker Keeper started");

            while (!stopping.get()) {
                tick();
            }

            halt();

            log.info("Worker Keeper stopped");
        } catch (Throwable t) {
            log.error("Uncaught errors in worker keeper thread, exiting: ", t);
            stopLatch.countDown();
            System.exit(1);
        } finally {
            stopLatch.countDown();
        }
    }

六、task 启动过程

启动日志

2021-08-29 15:20:49.670 [WorkerKeeper] INFO  com.ucar.datalink.biz.service.impl.TaskConfigServiceImpl - Time for querying task from db  is : 2ms
2021-08-29 15:20:49.671 [WorkerKeeper] INFO  c.u.datalink.worker.core.runtime.coordinate.WorkerKeeper - task 1 target state change
2021-08-29 15:20:49.671 [WorkerKeeper] INFO  c.u.datalink.worker.core.runtime.coordinate.WorkerKeeper - Handling task state change by transiting state to STARTED for task 1
2021-08-29 15:20:49.671 [WorkerKeeper] INFO  com.ucar.datalink.worker.core.runtime.Worker - Setting task 1 state to STARTED
2021-08-29 15:20:49.672 [Task-1-Reader-reader-mysql] INFO  com.ucar.datalink.reader.mysql.MysqlTaskReader - The filter for canal is test1.t_dl_test_attr,test1.t_dl_test_source.
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO  com.ucar.datalink.reader.mysql.MysqlTaskReader - init eventSink begin...
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO  com.ucar.datalink.reader.mysql.MysqlTaskReader - init eventSink end! 
	 load CanalEventSink:com.alibaba.otter.canal.sink.entry.EntryEventSink
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO  c.u.d.reader.mysql.extend.CustomCanalInstanceWithManager - init eventParser begin...
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO  c.u.d.reader.mysql.extend.CustomCanalInstanceWithManager - init eventParser end! 
	 load CanalEventParser:com.ucar.datalink.reader.mysql.extend.CustomInsideGroupMysqlEventParser
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO  com.ucar.datalink.reader.mysql.CanalTaskmetaManager - ClientIdentity is initialized for destination 1.
2021-08-29 15:20:49.677 [destination = 1 , address = localhost/127.0.0.1:3306 , EventParser] INFO  c.u.d.r.mysql.extend.CustomInsideGroupMysqlEventParser - start heart beat.... 
2021-08-29 15:20:49.677 [Task-1-Reader-reader-mysql] INFO  com.ucar.datalink.worker.core.runtime.WorkerTaskReader - TaskReader-reader-mysql finished initialization and start.
2021-08-29 15:20:49.682 [destination = 1 , address = localhost/127.0.0.1:3306 , EventParser] WARN  c.u.d.r.mysql.extend.CustomInsideGroupMysqlEventParser - prepare to find start position just last position
 {"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000006","position":80751,"serverId":1,"timestamp":1630221169000}}
2021-08-29 15:20:49.682 [destination = 1 , address = localhost/127.0.0.1:3306 , EventParser] INFO  c.u.d.r.mysql.extend.CustomInsideGroupMysqlEventParser - find start position : EntryPosition[included=false,journalName=mysql-bin.000006,position=80751,serverId=1,timestamp=1630221169000]

暂停任务

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/713538.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号