Checkpoint执行过程分为:启动、执行以及确认完成三个阶段。
- CheckpointCoordinator控制Checkpoint执行:JM端的CheckpointCoordinator组件会周期性的向数据源发送执行CK的请求,数据源节点将数据源消费的offset发送给JM,存储到CK的元数据信息中。同时向下广播barrier。中间算子对齐barrier:中间算子在StreamTaskNetworkInput组件中读取数据并对齐各个channel的barrier。barrier对齐后,触发StreamTask的CK操作。将状态数据快照存储到外部持久化介质中,并向JM发送ack响应(会携带该task的状态信息)。CK完成后向Task发送通知:当JM接收到所有sink节点的ack消息后,JM确认本次CK操作完成(JM将CK元数据和算子状态序列化到远程持久化存储或内存之后),向所有Task实例发送本次CK完成的消息。
在执行Checkpoint的过程中,JM会对job中所有的快照进行统一协调和管理。在创建ExecutionGrap时会创建对应的组件。
在ExecutionGrap创建过程中会生成CompletedCheckpointStore、CheckpointStatsTracker 、CheckpointCoordinator组件用于监控和管理Job中的CK操作。
public class ExecutionGraphBuilder {
public static ExecutionGraph buildGraph(
@Nullable ExecutionGraph prior,
JobGraph jobGraph,
Configuration jobManagerConfig,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
SlotProvider slotProvider,
ClassLoader classLoader,
CheckpointRecoveryFactory recoveryFactory,
Time rpcTimeout,
RestartStrategy restartStrategy,
MetricGroup metrics,
BlobWriter blobWriter,
Time allocationTimeout,
Logger log,
ShuffleMaster> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
FailoverStrategy.Factory failoverStrategyFactory) throws JobExecutionException, JobException {
//。。。
JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
if (snapshotSettings != null) {
//获取source节点,这些节点通过CheckpointCoordinator主动触发CK
List triggerVertices =
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
//ackVertices、/confirm/iVertices存储了StreamGrap的全部节点,所有节点都需要返回Ack确认信息并确认CK执行成功。
List ackVertices =
idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
List confirmVertices =
idToVertex(snapshotSettings.getVerticesTo/confirm/i(), executionGraph);
//存储CK的元数据信息
CompletedCheckpointStore completedCheckpoints;
//通过counter保证只存储固定数量的CompletedCheckpoint
CheckpointIDCounter checkpointIdCounter;
//。。。
//用于监控和追踪CK执行和更新的情况,WebUI显示的CK数据主要就来自于该tracker
CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
historySize,
ackVertices,
snapshotSettings.getCheckpointCoordinatorConfiguration(),
metrics);
//。。。
//在作业执行调度中开启CK,期间会创建CheckpointCoordinator组件
executionGraph.enableCheckpointing(
chkConfig,
triggerVertices,
ackVertices,
/confirm/iVertices,
hooks,
checkpointIdCounter,
completedCheckpoints,
rootBackend,
checkpointStatsTracker);
}
}
}
public void enableCheckpointing( CheckpointCoordinatorConfiguration chkConfig, ListCheckpoint触发过程verticesToTrigger, List verticesToWaitFor, List verticesToCommitTo, List > masterHooks, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, CheckpointStatsTracker statsTracker) { //。。。 //timer定时器用于CheckpointCoordinator定时触发Source节点的CK操作 checkpointCoordinatorTimer = Executors.newSingleThreadScheduledExecutor( new DispatcherThreadFactory( Thread.currentThread().getThreadGroup(), "Checkpoint Timer")); // CK协调器,用于创建和保持检查点状态等功能,协调和管理Job中的Checkpoint checkpointCoordinator = new CheckpointCoordinator( jobInformation.getJobId(), chkConfig, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, checkpointStateBackend, ioExecutor, new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer), SharedStateRegistry.DEFAULT_FACTORY, failureManager); } //注册JobStatusListener监听器,当JobStatus变为running时,通过监听器启动CheckpointCoordinator if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) { registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); }
CK的触发过程有两种方式:一种时source算子通过CheckpointCoordinator组件进行协调和控制,CheckpointCoordinator通过定时器的方式定时触发source算子节点的CK操作。另一种是下游算子节点根据上游算子节点发送的barrier事件控制CK的触发时机。
CheckpointCoordinator触发算子Checkpoint操作CheckpointCoordinator负责Source算子节点CK操作以及整个作业的CK管理,并且CheckpointCoordinator组件会接收TaskManager在CK执行完成之后返回的Ack信息。
CheckpointCoordinator用过监听器启动,当JobStatus变为RUNNING状态时启动CheckpointCoordinator。
public class CheckpointCoordinatorDeActivator implements JobStatusListener {
@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
if (newJobStatus == JobStatus.RUNNING) {
// 启动CK调度程序
coordinator.startCheckpointScheduler();
} else {
// 停止CK调度
coordinator.stopCheckpointScheduler();
}
}
}
CheckpointCoordinator通过定时器周期性的触发ScheduledTrigger线程
public class CheckpointCoordinator {
private final ScheduledExecutor timer;
//通过timer定时器周期性触发ScheduledTrigger线程
private ScheduledFuture> scheduleTriggerWithDelay(long initDelay) {
return timer.scheduleAtFixedRate(
new ScheduledTrigger(),
initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
//调用triggerCheckpoint方法触发CK操作
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint for job {}.", job, e);
}
}
}
}
CheckpointCoordinator.triggerCheckpoint()方法的逻辑比较多,主要包括以下步骤:
- CK操作前的检查操作:
检查CK的执行环节和参数、构建CK操作对应Task节点实例的Execution集合、构建需要发送Ack消息的ExecutionVertex集合。创建PendingCheckpoint
从开始执行CK操作直到所有Task实例返回Ack确认成功消息,CK会一直处于Pending状态,确保Ck能被成功执行。PendingCheckpoint存储了ID、ackTasks、快照存储位置等信息
//定义CK过程中状态快照数据存放位置
final CheckpointStorageLocation checkpointStorageLocation;
final long checkpointID;
try {
// CK的唯一标记,HA集群会通过Zookeeper实现checkpointID计数
checkpointID = checkpointIdCounter.getAndIncrement();
checkpointStorageLocation = props.isSavepoint() ?
checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation) :
checkpointStorage.initializeLocationForCheckpoint(checkpointID);
}
catch (Throwable t) {
int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
LOG.warn("Failed to trigger checkpoint for job {} ({} consecutive failed attempts so far).",
job,
numUnsuccessful,
t);
throw new CheckpointException(CheckpointFailureReason.EXCEPTION, t);
}
final PendingCheckpoint checkpoint = new PendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
masterHooks.keySet(),
props,
checkpointStorageLocation,
executor);
- CK操作的触发和完成
会遍历所有Source算子的Execution节点,触发节点所在TaskExecutor的CK操作。
Execution[] executions = new Execution[tasksToTrigger.length];
//。。。
//触发所有Source算子的CK操作
for (Execution execution: executions) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
//返回CK中的CompletionFuture对象
numUnsuccessfulCheckpointsTriggers.set(0);
return checkpoint.getCompletionFuture();
之后的会通过Execution的LogicalSlot拿到对于的TaskManagerGateway,然后通过TaskManagerGateway调用TaskExecutor.triggerCheckpoint()。
再从TaskExecutor的taskSlotTable中拿到对于的Task线程,最后调用StreamTask.triggerCheckpointAsync()方法执行CK操作。
通过Execution的Slot资源获取到TaskManger对应的网关,通过RPC调用触发对应Task的CK操作 public class Execution implements AccessExecution, Archiveable, LogicalSlot.Payload { private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { final CheckpointType checkpointType = checkpointOptions.getCheckpointType(); if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) { throw new IllegalArgumentException("only synchronous savepoints are allowed to advance the watermark to MAX."); } //获取当前Execution分配的LogicalSlot资源 final LogicalSlot slot = assignedResource; if (slot != null) { //通过LogicalSlot获取到TaskManager对应的网关 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); //RPC调用TaskManager触发对应Task的CK操作 taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime); } else { LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running."); } } }
StreamTask触发CK操作: public abstract class StreamTask对齐Barrier触发CK操作> extends AbstractInvokable implements AsyncExceptionHandler { @Override public Future triggerCheckpointAsync( CheckpointmetaData checkpointmetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) { //将CK操作封装为Mail,发送到StreamTask的MailBox中进行调度 return mailboxProcessor.getMainMailboxExecutor().submit( () -> triggerCheckpoint(checkpointmetaData, checkpointOptions, advanceToEndOfEventTime), "checkpoint %s with %s", checkpointmetaData, checkpointOptions); }
StreamTask的Barrier对齐是通过CheckpointInputGate(封装的InputGate,具有barrier对齐功能)读取网络数据时触发的,这里过具体流程我们在上一节StreamTask数据流中已经介绍过了。
当所有channel的barrier对齐之后就会触发StreamTask.performCheckpoint()方法,生成当前Task的快照。
private boolean performCheckpoint(
CheckpointmetaData checkpointmetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics,
boolean advanceToEndOfTime) throws Exception {
LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointmetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
final long checkpointId = checkpointmetaData.getCheckpointId();
if (isRunning) {
actionExecutor.runThrowing(() -> {
if (checkpointOptions.getCheckpointType().isSynchronous()) {
setSynchronousSavepointId(checkpointId);
if (advanceToEndOfTime) {
advanceToEndOfEventTime();
}
}
// 一下所有操作应该是原子性的
// Step (1): 执行一些预屏障工作,一般是不执行或执行一些轻量级的工作
operatorChain.prepareSnapshotPreBarrier(checkpointId);
// Step (2): 将barrier向下游广播出去
operatorChain.broadcastCheckpointBarrier(
checkpointId,
checkpointmetaData.getTimestamp(),
checkpointOptions);
// Step (3): 对所有算子进行快照操作,该步骤是异步操作,不影响数据流的正常处理
checkpointState(checkpointmetaData, checkpointOptions, checkpointMetrics);
});
return true;
} else {
actionExecutor.runThrowing(() -> {
// Task没有处于RUNNING状态,向下游广播CancelCheckpointMarker事件,取消此次CK
final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointmetaData.getCheckpointId());
recordWriter.broadcastEvent(message);
});
return false;
}
}
接下来我们讲解StreamTask执行快照操作的具体过程。
- CheckpointingOperation 执行CK操作
public abstract class StreamTask> extends AbstractInvokable implements AsyncExceptionHandler { private void checkpointState( CheckpointmetaData checkpointmetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { //创建CheckpointStreamFactory实例,用于具体的状态存储, //有Memory和FS两种实现,分别支持内存和文件文件类型系统的数据流输出 CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation( checkpointmetaData.getCheckpointId(), checkpointOptions.getTargetLocation()); // CheckpointingOperation 封装了CK执行的具体操作 CheckpointingOperation checkpointingOperation = new CheckpointingOperation( this, checkpointmetaData, checkpointOptions, storage, checkpointMetrics); //执行CK操作 checkpointingOperation.executeCheckpointing(); } }
private static final class CheckpointingOperation {
public void executeCheckpointing() throws Exception {
//对StreamTask的所有算子创建执行快照操作的OperatorSnapshotFutures对象,
//并将所有算子的快照操作存储在operatorSnapshotsInProgress集合中
for (StreamOperator> op : allOperators) {
checkpointStreamOperator(op);
}
//。。。
//AsyncCheckpointRunnable线程执行具体快照操作
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointmetaData,
checkpointMetrics,
startAsyncPartNano);
//通过StreamTask的asyncOperationsThreadPool线程池,异步执行operatorSnapshotsInProgress集合中所有算子的快照操作
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
}
private void checkpointStreamOperator(StreamOperator> op) throws Exception {
if (null != op) {
//将当前算子的快照操作封装到OperatorSnapshotFutures中
OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
checkpointmetaData.getCheckpointId(),
checkpointmetaData.getTimestamp(),
checkpointOptions,
storageLocation);
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
}
}
}
- 将算子中的状态快照操作封装到OperatorSnapshotFutures 中
从此处我们可以看出,原生状态和管理状态的状态生成过程不同。
(1)原生状态主要通过从snapshotContext中获取原生状态的快照操作;
(2)管理状态主要通过operatorStateBackend&keyedStateBackend进行状态管理,并根据StateBackend的不同实现将状态写入内存或外部文件系统中。
public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
CheckpointStreamFactory factory) throws Exception {
//1.如果有keyedStateBackend ,获取对于的KeyGroupRange
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
//2.OperatorSnapshotFutures 对象,封装当前算子的状态快照操作
OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
//3.存储快照过程需要的上下文信息
StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables());
try {
//执行快照操作
snapshotState(snapshotContext);
//设置KeyedStateRawFuture&OperatorStateRawFuture,用于处理原生数据快照
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
//将operatorStateBackend&keyedStateBackend的状态快照方法注册到snapshotInProgress中,等待执行
if (null != operatorStateBackend) {
//设置OperatorState快照的异步future
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
//设置KeyedState快照的异步future
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
//。。。
}
//snapshotInProgress中封装了当前算子需要执行的所有快照操作
return snapshotInProgress;
}
- AsyncCheckpointRunnable 线程的定义和执行
所有的状态快照操作都会被封装到OperatorSnapshotFutures对象中,最终通过AsyncCheckpointRunnable 线程触发执行。
protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
@Override
public void run() {
//1.为当前线程初始化文件系统安全网,确保数据正确写入
FileSystemSafetyNet.initializeSafetyNetForThread();
try {
//发送给JM的CK数据
TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
//TaskExecutor本地的状态数据
TaskStateSnapshot localTaskOperatorSubtaskStates =
new TaskStateSnapshot(operatorSnapshotsInProgress.size());
//遍历获取StreamTask中所有算子的OperatorSnapshotFutures对象
for (Map.Entry entry : operatorSnapshotsInProgress.entrySet()) {
OperatorID operatorID = entry.getKey();
OperatorSnapshotFutures snapshotInProgress = entry.getValue();
// 用于执行所有状态快照线程操作,会执行KeyedState&OperatorState的快照操作
OperatorSnapshotFinalizer finalizedSnapshots =
new OperatorSnapshotFinalizer(snapshotInProgress);
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());
}
//。。。
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
//异步快照完成,向JM汇报CK的执行结果,并将状态发送给JM
reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);
} else {
LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
owner.getName(),
checkpointmetaData.getCheckpointId());
}
} catch (Exception e) {
//。。。
}
}
}
算子中托管状态主要借助KeyedStateBackend&OperatorStateBackend管理,两个状态后端都实现了SnapshotStrategy接口,提供了状态快照方法。
SnapshotStrategy根据不同的状态后端,主要分为HeapSnapshotStrategy和RocksDBSnapshotStrategy,其中RocksDBSnapshotStrategy又分为增量和全量两种子类实现。
- 发送AcknowledgeCheckpoint消息到CheckpointCoordinator中
在StreamTask中所有算子都完成状态数据的快照之后,Task实例会将TaskStateSnapshot消息发送给JM的CheckpointCoordinator,并在CheckpointCoordinator中完成后续操作,例如确认接受到所有Task实例的Ack消息以及将当前的PendingCheckpoint转换为CompleteCheckpoint,并将CK元数据写到外部持久化文件系统中等操作。
Checkpoint的确认过程主要如下:
(1)StreamTask中所有算子快照完成后,调用StreamTask.reportCompletedSnapshotStates方法将快照等信息发送给TaskStateManager;
(2)TaskStateManager通过CheckpointCoordinatorGateway将CK的Ack信息发送给CheckpointCoordinator;
(3)JobMaster收到Ack消息之后,调用SchedulerNG.acknowledgeCheckpoint方法将Ack消息封装为AcknowledgeCheckpoint对象,传递给CheckpointCoordinator;
(4)CheckpointCoordinator取出对于的PendingCheckpoint,判断是否所有Task实例都Ack消息都收到了,如果所有Task的Ack都已收到,则调用completePendingCheckpoint方法完成当前PendingCheckpoint操作;
(5)将PendingCheckpoint转化为CompleteCheckpoint,此时会将该CK的元数据和算子状态数据序列化到外部文件系统或内存中,并将CompleteCheckpoint添加到集合中;
(6)CheckpointCoordinator遍历所有Task对于的Execution节点,RPC调用ask实例的notifyCheckpointComplete方法。
CheckpointCoordinator受到所有Task实例的ACK响应后,会调用PendingCheckpoint.finalizeCheckpoint将PendingCheckpoint转化为CompleteCheckpoint,并将CK的状态数据写到外部文件系统中。
public CompletedCheckpoint finalizeCheckpoint() throws IOException {
synchronized (lock) {
try {
// write out the metadata
final Savepoint savepoint = new SavepointV2(checkpointId, operatorStates.values(), masterStates);
final CompletedCheckpointStorageLocation finalizedLocation;
try (CheckpointmetadataOutputStream out = targetLocation.createmetadataOutputStream()) {
Checkpoints.storeCheckpointmetadata(savepoint, out);
finalizedLocation = out.closeAndFinalizeCheckpoint();
}
CompletedCheckpoint completed = new CompletedCheckpoint(
jobId,
checkpointId,
checkpointTimestamp,
System.currentTimeMillis(),
operatorStates,
masterStates,
props,
finalizedLocation);
return completed;
}
catch (Throwable t) {
onCompletionPromise.completeExceptionally(t);
ExceptionUtils.rethrowIOException(t);
return null; // silence the compiler
}
}
}
最终会调用SavepointV2Serializer 将状态序列化后,写到外部文件系统或内存中 public class SavepointV2Serializer implements SavepointSerializer{ @Override public void serialize(SavepointV2 checkpointmetadata, DataOutputStream dos) throws IOException { // first: checkpoint ID dos.writeLong(checkpointmetadata.getCheckpointId()); // second: master state final Collection masterStates = checkpointmetadata.getMasterStates(); dos.writeInt(masterStates.size()); for (MasterState ms : masterStates) { serializeMasterState(ms, dos); } // third: operator states Collection operatorStates = checkpointmetadata.getOperatorStates(); dos.writeInt(operatorStates.size()); for (OperatorState operatorState : operatorStates) { // Operator ID dos.writeLong(operatorState.getOperatorID().getLowerPart()); dos.writeLong(operatorState.getOperatorID().getUpperPart()); // Parallelism int parallelism = operatorState.getParallelism(); dos.writeInt(parallelism); dos.writeInt(operatorState.getMaxParallelism()); dos.writeInt(1); // Sub task states Map subtaskStateMap = operatorState.getSubtaskStates(); dos.writeInt(subtaskStateMap.size()); for (Map.Entry entry : subtaskStateMap.entrySet()) { dos.writeInt(entry.getKey()); serializeSubtaskState(entry.getValue(), dos); } } } }



