Timer(定时器)是 Flink Streaming API 提供的用于感知并利用 Processing Time/Event Time 变化的机制。Ververica blog上给出的描述如下:
Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.
对于普通用户来说,最常见的显式利用 Timer 的地方就是 KeyedProcessFunction。我们在其 processElement() 方法中注册 Timer,然后覆写其 onTimer() 方法作为 Timer 触发时的回调逻辑。根据时间特征的不同:
处理时间——调用 Context.timerService().registerProcessingTimeTimer() 注册;onTimer() 在系统时间戳达到 Timer设定的时间戳时触发。事件时间——调用 Context.timerService().registerEventTimeTimer() 注册;onTimer() 在 Flink 内部水印达到或超过 Timer 设定的时间戳时触发。 TimerService
@PublicEvolving
public interface TimerService {
String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";
String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";
// 返回当前的系统时间
long currentProcessingTime();
// 返回当前的 event time 水印
long currentWatermark();
// 注册 Processing Time 定时器
// 当系统时间超过给定的时间后,定时器会被触发
void registerProcessingTimeTimer(long time);
// 注册 Event Time 定时器
// 当系统水印超过给定的时间后,定时器会被触发
void registerEventTimeTimer(long time);
// 删除 Processing Time 定时器
void deleteProcessingTimeTimer(long time);
// 删除 Event Time 定时器
void deleteEventTimeTimer(long time);
}
Flink Streaming API 内部提供了 SimpleTimerService 类实现了 TimerService 接口。
@Internal
public class SimpleTimerService implements TimerService {
private final InternalTimerService internalTimerService;
public SimpleTimerService(InternalTimerService internalTimerService) {
this.internalTimerService = internalTimerService;
}
@Override
public long currentProcessingTime() {
return internalTimerService.currentProcessingTime();
}
@Override
public long currentWatermark() {
return internalTimerService.currentWatermark();
}
@Override
public void registerProcessingTimeTimer(long time) {
internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
}
@Override
public void registerEventTimeTimer(long time) {
internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
}
@Override
public void deleteProcessingTimeTimer(long time) {
internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);
}
@Override
public void deleteEventTimeTimer(long time) {
internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);
}
}
可以看出来,SimpleTimerService 是通过它持有的 InternalTimerService internalTimerService 来完成定时器相应操作的。
InternalTimerServiceInternalTimerService 也是1个接口,可以认为是 {@link org.apache.flink.streaming.api.TimerService} 的内部增强版本。
InternalTimerService 在注册定时器的时候,需要指定该定时器的 namespace,从而可以对定时器进行分类管理。
@Internal public interface InternalTimerService{ long currentProcessingTime(); long currentWatermark(); void registerProcessingTimeTimer(N namespace, long time); void deleteProcessingTimeTimer(N namespace, long time); void registerEventTimeTimer(N namespace, long time); void deleteEventTimeTimer(N namespace, long time); // 为每个注册的 Event Time 定时器定义1个执行 Function // 定时器服务将在该 Function 调用操作之前为定时器设置 key context void forEachEventTimeTimer(BiConsumerWithException consumer) throws Exception; // 为每个注册的 Processing Time 定时器定义1个执行 Function // 定时器服务将在该 Function 调用操作之前为定时器设置 key context void forEachProcessingTimeTimer(BiConsumerWithException consumer) throws Exception; }
接着看一下 InternalTimerService 接口的实现类。
TestInternalTimerService 主要用于测试(实现较简单),重点关注 InternalTimerServiceImpl。
先看一下构造函数:
InternalTimerServiceImpl( KeyGroupRange localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, KeyGroupedInternalPriorityQueue> processingTimeTimersQueue, KeyGroupedInternalPriorityQueue > eventTimeTimersQueue) { this.keyContext = checkNotNull(keyContext); this.processingTimeService = checkNotNull(processingTimeService); this.localKeyGroupRange = checkNotNull(localKeyGroupRange); this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue); this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue); int startIdx = Integer.MAX_VALUE; for (Integer keyGroupIdx : localKeyGroupRange) { startIdx = Math.min(keyGroupIdx, startIdx); } this.localKeyGroupRangeStartIdx = startIdx; }
ProcessingTimeService processingTimeService 主要用于完成 Processing Time 相关的操作;KeyGroupedInternalPriorityQueue ProcessingTimeService 接口我在另外1篇文章里详细讲过,此处不再赘述。 Flink源码解析系列–ProcessingTimeService定时器 KeyGroupedInternalPriorityQueue 是 Flink 自定义的优先队列接口,其相关实现类原理跟 JDK 的优先队列类似,也是基于最小/大堆来实现的,后续我会专门写1篇文章来讲解。 当 Processing Time 到达优先队列中顶部元素设置的时间时,会执行 onProcessingTime 方法: 没有读这部分源码的时候,我以为当创建 Processing Time 定时器的时候,Flink 会直接将任务提交给线程池等待被执行,这种方式实现起来很简单,但如果生产环境中创建了大量的待执行定时器时,需要较多的线程资源。 而 Flink 的解决方案就很优雅,其将所有注册的定时器通过优先队列管理起来,只关心下一个最先要执行的定时器,并将其赋给 nextTimer,nextTimer 被触发执行 onProcessingTime 的时候,会依次将队列中触发时间不大于自己的依次 poll 出队触发掉。执行完 onProcessingTime 之后若优先队列中还有待执行的定时器,则重新注册下一个需要执行的定时器,循环往复。整个过程只需要启用1个定时线程即可,从而大大节省了线程资源。 可以看到,registerEventTimeTimer 的逻辑很简单,就是创建好 TimerHeapInternalTimer 添加进 eventTimeTimersQueue 优先队列即可。 那定时器如何被触发呢?Processing Time 定时器是通过线程池来触发的,但 Event Time 定时器的触发显然需要依赖水印的推进。 概览下 InternalTimerServiceImpl 方法,可以发现 advanceWatermark 方法和水印相关。 查看一下 advanceWatermark 方法的调用链: 接着往上追溯: 算子基类 AbstractStreamOperator 中处理水印的方法 processWatermark()。当水印到来时,就会按着上述调用链流转到 InternalTimerServiceImpl 中,并触发所有早于水印时间戳的 Timer。 综上,Processing Time 定时器是通过定时线程池推动触发的,Event Time 定时器是通过水印推动触发的。 https://blog.csdn.net/nazeniwaresakini/article/details/104220113@Override
public void registerProcessingTimeTimer(N namespace, long time) {
// 获取优先队列的顶部元素(最先执行的定时器)
InternalTimer
private void onProcessingTime(long time) throws Exception {
nextTimer = null;
InternalTimer
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(
new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer
@Override
public void advanceWatermark(Watermark watermark) throws Exception {
for (InternalTimerServiceImpl, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}
public void processWatermark(Watermark mark) throws Exception {
if (timeServiceManager != null) {
timeServiceManager.advanceWatermark(mark);
}
output.emitWatermark(mark);
}



