本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.
@PublicEvolving
public interface ProcessingTimeService {
// 返回当前的 processing time
long getCurrentProcessingTime();
// 注册定时器,并传入回调接口
// 当系统的 processing time 推进到 timestamp 时,会执行传入的 ProcessingTimeCallback 回调接口中定义的方法
ScheduledFuture> registerTimer(long timestamp, ProcessingTimeCallback target);
// 回调方法接口
@PublicEvolving
interface ProcessingTimeCallback {
// 当定时器到达注册时间时,该方法会被调用
void onProcessingTime(long time) throws IOException, InterruptedException, Exception;
}
}
在 Stream 环境中,往往存在固定周期(FixedRate)或固定延误(FixedDelay)执行的需求,所以 Flink 在 org.apache.flink.streaming.runtime.tasks 包中继承并定义了同名的 ProcessingTimeService 接口:
public interface ProcessingTimeService
extends org.apache.flink.api.common.operators.ProcessingTimeService {
// 以固定周期执行某任务
// 该调用和 JUC 包的 {@link ScheduledExecutor#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} 表现类似
ScheduledFuture> scheduleAtFixedRate(
ProcessingTimeCallback callback, long initialDelay, long period);
// 以固定延迟执行某任务
ScheduledFuture> scheduleWithFixedDelay(
ProcessingTimeCallback callback, long initialDelay, long period);
CompletableFuture quiesce();
}
接着看一下其实现类 ProcessingTimeServiceImpl:
@Internal
class ProcessingTimeServiceImpl implements ProcessingTimeService {
private final TimerService timerService;
private final Function
processingTimeCallbackWrapper;
private final AtomicInteger numRunningTimers;
private final CompletableFuture quiesceCompletedFuture;
private volatile boolean quiesced;
ProcessingTimeServiceImpl(
TimerService timerService,
Function
processingTimeCallbackWrapper) {
this.timerService = timerService;
this.processingTimeCallbackWrapper = processingTimeCallbackWrapper;
this.numRunningTimers = new AtomicInteger(0);
this.quiesceCompletedFuture = new CompletableFuture<>();
this.quiesced = false;
}
@Override
public long getCurrentProcessingTime() {
// 调用 timerService 对应的方法
return timerService.getCurrentProcessingTime();
}
@Override
public ScheduledFuture> registerTimer(long timestamp, ProcessingTimeCallback target) {
if (isQuiesced()) {
return new NeverCompleteFuture(
ProcessingTimeServiceUtil.getProcessingTimeDelay(
timestamp, getCurrentProcessingTime()));
}
// 调用 timerService 对应的方法
return timerService.registerTimer(
timestamp,
addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(target)));
}
@Override
public ScheduledFuture> scheduleAtFixedRate(
ProcessingTimeCallback callback, long initialDelay, long period) {
if (isQuiesced()) {
return new NeverCompleteFuture(initialDelay);
}
// 调用 timerService 对应的方法
return timerService.scheduleAtFixedRate(
addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(callback)),
initialDelay,
period);
}
@Override
public ScheduledFuture> scheduleWithFixedDelay(
ProcessingTimeCallback callback, long initialDelay, long period) {
if (isQuiesced()) {
return new NeverCompleteFuture(initialDelay);
}
// 调用 timerService 对应的方法
return timerService.scheduleWithFixedDelay(
addQuiesceProcessingToCallback(processingTimeCallbackWrapper.apply(callback)),
initialDelay,
period);
}
@Override
public CompletableFuture quiesce() {
if (!quiesced) {
quiesced = true;
if (numRunningTimers.get() == 0) {
quiesceCompletedFuture.complete(null);
}
}
return quiesceCompletedFuture;
}
private boolean isQuiesced() {
return quiesced;
}
private ProcessingTimeCallback addQuiesceProcessingToCallback(ProcessingTimeCallback callback) {
return timestamp -> {
if (isQuiesced()) {
return;
}
numRunningTimers.incrementAndGet();
try {
// double check to deal with the race condition:
// before executing the previous line to increase the number of running timers,
// the quiesce-completed future is already completed as the number of running
// timers is 0 and "quiesced" is true
if (!isQuiesced()) {
callback.onProcessingTime(timestamp);
}
} finally {
if (numRunningTimers.decrementAndGet() == 0 && isQuiesced()) {
quiesceCompletedFuture.complete(null);
}
}
};
}
}
由上述代码可知,ProcessingTimeServiceImpl 实现类主要是通过构造函数传入的 TimerService 来完成相应定时操作的。
而 TimerService 其实就是在 ProcessingTimeService 接口上进一步增强了一下:
@Internal
public interface TimerService extends ProcessingTimeService {
// 返回服务是否被 shutdown
boolean isTerminated();
// 终止服务,并清除服务上注册的所有定时器
// 该方法是 hard 和即时的,不会等待任何定时器任务的执行
// 还未执行的未来定时任务将直接抛出 hard 异常
void shutdownService();
// 终止服务
// 阻塞等待服务完全被终止,不会被中断
// 除非超时返回异常
boolean shutdownServiceUninterruptible(long timeoutMs);
}
下面重点讲解一下 TimerService 接口的实现类 SystemProcessingTimeService。
SystemProcessingTimeService 有2个构造函数:
@VisibleForTesting
// ExceptionHandler 为异常处理接口
SystemProcessingTimeService(ExceptionHandler exceptionHandler) {
this(exceptionHandler, null);
}
SystemProcessingTimeService(ExceptionHandler exceptionHandler, ThreadFactory threadFactory) {
this.exceptionHandler = checkNotNull(exceptionHandler);
this.status = new AtomicInteger(STATUS_ALIVE);
this.quiesceCompletedFuture = new CompletableFuture<>();
// 注意,这里的 ScheduledTaskExecutor 是通过继承 JUC 的 ScheduledThreadPoolExecutor 线程池来实现的
if (threadFactory == null) {
this.timerService = new ScheduledTaskExecutor(1);
} else {
this.timerService = new ScheduledTaskExecutor(1, threadFactory);
}
// 如果 future 被 cancel 了,则所有定时任务会被清除
this.timerService.setRemoveonCancelPolicy(true);
// 确保 shutdown 操作会清除所有 pending tasks
this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
}
SystemProcessingTimeService 没有直接使用 JDK.JUC 包提供的定时任务线程池 ScheduledTaskExecutor,而是通过继承 JUC.ScheduledThreadPoolExecutor 的方式自定义了1个线程池。
private class ScheduledTaskExecutor extends ScheduledThreadPoolExecutor {
public ScheduledTaskExecutor(int corePoolSize) {
super(corePoolSize);
}
public ScheduledTaskExecutor(int corePoolSize, ThreadFactory threadFactory) {
super(corePoolSize, threadFactory);
}
@Override
// 仅重写了 terminated() 方法
protected void terminated() {
super.terminated();
quiesceCompletedFuture.complete(null);
}
}
getCurrentProcessingTime()
@Override
public long getCurrentProcessingTime() {
return System.currentTimeMillis();
}
registerTimer(long timestamp, ProcessingTimeCallback callback)
@Override
public ScheduledFuture> registerTimer(long timestamp, ProcessingTimeCallback callback) {
long delay =
ProcessingTimeServiceUtil.getProcessingTimeDelay(
timestamp, getCurrentProcessingTime());
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
// 基于 timerService 的 schedule 方法实现的
// 而 timerService 又是 JUC.ScheduledThreadPoolExecutor 的继承类
// 本质上还是借用了 JUC.ScheduledThreadPoolExecutor 的方法实现
return timerService.schedule(
wraponTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(delay);
} else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
} else {
// something else happened, so propagate the exception
throw e;
}
}
}
scheduleAtFixedRate 和 scheduleWithFixedDelay
@Override
public ScheduledFuture> scheduleAtFixedRate(
ProcessingTimeCallback callback, long initialDelay, long period) {
return scheduleRepeatedly(callback, initialDelay, period, false);
}
@Override
public ScheduledFuture> scheduleWithFixedDelay(
ProcessingTimeCallback callback, long initialDelay, long period) {
return scheduleRepeatedly(callback, initialDelay, period, true);
}
private ScheduledFuture> scheduleRepeatedly(
ProcessingTimeCallback callback, long initialDelay, long period, boolean fixedDelay) {
final long nextTimestamp = getCurrentProcessingTime() + initialDelay;
final Runnable task = wraponTimerCallback(callback, nextTimestamp, period);
// we directly try to register the timer and only react to the status on exception
// that way we save unnecessary volatile accesses for each timer
try {
return fixedDelay
// 最终调用的还是 JUC.ScheduledThreadPoolExecutor 的实现方法
? timerService.scheduleWithFixedDelay(
task, initialDelay, period, TimeUnit.MILLISECONDS)
: timerService.scheduleAtFixedRate(
task, initialDelay, period, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
final int status = this.status.get();
if (status == STATUS_QUIESCED) {
return new NeverCompleteFuture(initialDelay);
} else if (status == STATUS_SHUTDOWN) {
throw new IllegalStateException("Timer service is shut down");
} else {
// something else happened, so propagate the exception
throw e;
}
}
}
如果你不熟悉 JDK.JUC 包里 ScheduledThreadPoolExecutor 类的方法实现,可以参考我之前写的1篇博客:
ScheduledThreadPoolExecutor源码解析
可以发现,无论是 registerTimer 方法,还是 scheduleAtFixedRate 或 scheduleWithFixedDelay 方法,都是通过 wraponTimerCallback 方法将 ProcessingTimeCallback 回调接口和 Runnable 绑定在一起的。
private Runnable wraponTimerCallback(
ProcessingTimeCallback callback, long nextTimestamp, long period) {
return new ScheduledTask(status, exceptionHandler, callback, nextTimestamp, period);
}
private static final class ScheduledTask implements Runnable {
private final AtomicInteger serviceStatus;
private final ExceptionHandler exceptionHandler;
// 回调接口
private final ProcessingTimeCallback callback;
private long nextTimestamp;
private final long period;
ScheduledTask(
AtomicInteger serviceStatus,
ExceptionHandler exceptionHandler,
ProcessingTimeCallback callback,
long timestamp,
long period) {
this.serviceStatus = serviceStatus;
this.exceptionHandler = exceptionHandler;
this.callback = callback;
this.nextTimestamp = timestamp;
this.period = period;
}
@Override
public void run() {
// 首先检查服务状态是否为 ALIVE
// 若不是,说明服务被终止了,任务直接返回,无需执行回调接口
if (serviceStatus.get() != STATUS_ALIVE) {
return;
}
try {
// 执行回调接口的 onProcessingTime 方法
callback.onProcessingTime(nextTimestamp);
} catch (Exception ex) {
exceptionHandler.handleException(ex);
}
nextTimestamp += period;
}
}
ProcessingTimeService 的典型应用就是周期性水印的发送。
TimestampsAndWatermarksOperator 类实现了 ProcessingTimeCallback 接口,并实现了回调方法 onProcessingTime:
@Override
public void onProcessingTime(long timestamp) throws Exception {
// 代码①:调用水印生成器的 onPeriodicEmit 方法发送水印
watermarkGenerator.onPeriodicEmit(wmOutput);
final long now = getProcessingTimeService().getCurrentProcessingTime();
// 获取该 Operator 持有的 ProcessingTimeService
// 并注册定时器,将 this 作为回调接口传进去
// 当系统时钟到达 now + watermarkInterval 时,会再次调用该类的 onProcessingTime 方法
// 即间隔 watermarkInterval 毫秒,又再次执行①处代码,从而实现了周期性水印
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
本文到此结束,感谢阅读!



