一、启动位点管理
1、初始化位点管理缓存2、启动定时任务把内存位点信息刷到数据库或者ZK 二、启动探针管理三、初始化:刷新任务和版本(任务配置)四、任务状态管理(?)五、启动Worker keeper 监控管理六、task 启动过程
一、启动位点管理 1、初始化位点管理缓存在运行过程中位点是存储在内存中的,初始化时从数据库或者ZK中取出来。
this.updatedPositions = CacheBuilder.newBuilder().build(new CacheLoader2、启动定时任务把内存位点信息刷到数据库或者ZK() { @Override public Position load(String taskId) throws Exception { //从数据库或者ZK中把位点信息拉到缓存中 Position position = taskPositionService.getPosition(taskId); if (position == null) { return nullPosition; } else { return position; } } });
TaskPositionManager类中启动定时任务,按每秒的频率把位点信息刷新到数据库或者ZK(按BootMode配置来决定)
// 启动定时工作任务
this.executor.scheduleAtFixedRate(() -> {
synchronized (TaskPositionManager.this) {
List tasks = new ArrayList<>(updatePositionTasks);
for (String taskId : tasks) {
// 定时将内存中的最新值持久化到存储,多次变更只刷一次
flush(taskId);
}
}
}, period, period, TimeUnit.MILLISECONDS);
二、启动探针管理
包括任务延时监控、任务统计、任务异常、workjvm监控 、work 系统监控,都是采用定时任务把采集到的信息保存在数据库中。
ProbeManager.java
public void start(List probeBlackList) {
//init
initTaskDelayProbe(probeBlackList);
initTaskExceptionProbe(probeBlackList);
initTaskStatisticProbe(probeBlackList);
initWorkerJvmStateProbe(probeBlackList);
initWorkerSystemStateProbe(probeBlackList);
//start
taskDelayProbe.start();
taskStatisticProbe.start();
taskExceptionProbe.start();
workerJvmStateProbe.start();
workerSystemStateProbe.start();
logger.info("Probe manager started.");
}
TaskDelayProbeImpl.java
@Override
public void start() {
if (running) {
return;
}
this.taskMonitorExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory(
MessageFormat.format("Probe-Type-{0}", TaskDelayProbe.class.getSimpleName())));
//启动定时监控任务
taskMonitorExecutor.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
sendDelayReset();
} catch (Throwable e) {
logger.error("Task delay probe failed.", e);
}
}
}, period, period, TimeUnit.MILLISECONDS);
logger.info("Task delay probe started.");
this.running = true;
}
三、初始化:刷新任务和版本(任务配置)
TaskConfigManager.java
public void start() {
logger.info("Starting TaskConfigManager");
forceRefresh();
logger.info("Started TaskConfigManager");
}
@Override
public List getActiveTaskConfigsByGroup(Long groupId) {
Assert.notNull(groupId);
long start = System.currentTimeMillis();
List taskInfos = taskDAO.getListWithDeleted();
logger.info("Time for querying task from db is : " + (System.currentTimeMillis() - start) + "ms");
if (taskInfos == null) {
return Lists.newArrayList();
} else {
OptionalLong ol = taskInfos.stream().mapToLong(t -> t.getModifyTimeMillSeconds()).max();
long version = ol.isPresent() ? ol.getAsLong() : -1L;
taskInfos.forEach(t -> t.setVersion(version));
return taskInfos.stream().filter(t -> t.getGroupId().equals(groupId) && !t.isDelete()).collect(Collectors.toList());
}
}
四、任务状态管理(?)
TaskStatusManager.java
public synchronized void start() {
DlinkZkUtils.get()
.zkClient()
.subscribeStateChanges(zkStateListener);
}
五、启动Worker keeper 监控管理
WorkerKeeper.java
@Override
public void run() {
try {
log.info("Worker Keeper starting");
startServices();
log.info("Worker Keeper started");
while (!stopping.get()) {
tick();
}
halt();
log.info("Worker Keeper stopped");
} catch (Throwable t) {
log.error("Uncaught errors in worker keeper thread, exiting: ", t);
stopLatch.countDown();
System.exit(1);
} finally {
stopLatch.countDown();
}
}
六、task 启动过程
启动日志
2021-08-29 15:20:49.670 [WorkerKeeper] INFO com.ucar.datalink.biz.service.impl.TaskConfigServiceImpl - Time for querying task from db is : 2ms
2021-08-29 15:20:49.671 [WorkerKeeper] INFO c.u.datalink.worker.core.runtime.coordinate.WorkerKeeper - task 1 target state change
2021-08-29 15:20:49.671 [WorkerKeeper] INFO c.u.datalink.worker.core.runtime.coordinate.WorkerKeeper - Handling task state change by transiting state to STARTED for task 1
2021-08-29 15:20:49.671 [WorkerKeeper] INFO com.ucar.datalink.worker.core.runtime.Worker - Setting task 1 state to STARTED
2021-08-29 15:20:49.672 [Task-1-Reader-reader-mysql] INFO com.ucar.datalink.reader.mysql.MysqlTaskReader - The filter for canal is test1.t_dl_test_attr,test1.t_dl_test_source.
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO com.ucar.datalink.reader.mysql.MysqlTaskReader - init eventSink begin...
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO com.ucar.datalink.reader.mysql.MysqlTaskReader - init eventSink end!
load CanalEventSink:com.alibaba.otter.canal.sink.entry.EntryEventSink
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO c.u.d.reader.mysql.extend.CustomCanalInstanceWithManager - init eventParser begin...
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO c.u.d.reader.mysql.extend.CustomCanalInstanceWithManager - init eventParser end!
load CanalEventParser:com.ucar.datalink.reader.mysql.extend.CustomInsideGroupMysqlEventParser
2021-08-29 15:20:49.676 [Task-1-Reader-reader-mysql] INFO com.ucar.datalink.reader.mysql.CanalTaskmetaManager - ClientIdentity is initialized for destination 1.
2021-08-29 15:20:49.677 [destination = 1 , address = localhost/127.0.0.1:3306 , EventParser] INFO c.u.d.r.mysql.extend.CustomInsideGroupMysqlEventParser - start heart beat....
2021-08-29 15:20:49.677 [Task-1-Reader-reader-mysql] INFO com.ucar.datalink.worker.core.runtime.WorkerTaskReader - TaskReader-reader-mysql finished initialization and start.
2021-08-29 15:20:49.682 [destination = 1 , address = localhost/127.0.0.1:3306 , EventParser] WARN c.u.d.r.mysql.extend.CustomInsideGroupMysqlEventParser - prepare to find start position just last position
{"identity":{"slaveId":-1,"sourceAddress":{"address":"localhost","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000006","position":80751,"serverId":1,"timestamp":1630221169000}}
2021-08-29 15:20:49.682 [destination = 1 , address = localhost/127.0.0.1:3306 , EventParser] INFO c.u.d.r.mysql.extend.CustomInsideGroupMysqlEventParser - find start position : EntryPosition[included=false,journalName=mysql-bin.000006,position=80751,serverId=1,timestamp=1630221169000]
暂停任务



