Timer(定时器)是flink提供的基于处理时间、事件时间定时触发执行的机制,应用场景类似于时间轮,往时间轮注册事件,等约定的时间后,时间轮把事件发送出来。
我们在生产中常常用到定时器,比如当某个事件发生后,观察5分钟后,该事件是否发生状态改变,然后对事件进行相应的业务处理;在快递行业,客户下单,如果在30分钟内,快递小哥没有揽收,就会触发超时揽收任务,进行紧急调度,达到尽快揽收的目的,保障客户的满意度。
1.2 定时器的特点- 定时器是作用在keyedStream数据流上的
- 定时器是自动去重的
- 定时器会被checkpoint持久化的
- 定时器是可以被删除
1.3 怎么使用定时器去重的定义:在处理上flink认为同一个key只有一个定时器,如果重新加入相同key的定时器,那么原来的定时器就会被删掉
首先在flink要使用定时器,最常见的类就是KeyedProcessFunction,在该类中processElement()中注册Timer,然后在onTimer()中执行当事件到达约定事件的处理方式。
@Override public void processElement(Tuple2value, Context ctx, Collector > out) throws Exception { long time = System.currentTimeMillis(); long triggerTime = time + 3000; ctx.timerService().registerProcessingTimeTimer(nextTime); out.collect(new Tuple2<>(value.f0, 1)); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector > out) throws Exception { super.onTimer(timestamp, ctx, out); System.out.println(TimeUtil.getLongToStr(timestamp) + ", " + ctx.getCurrentKey()); }
在timeService中我们可以注册事件,也可以注销事件
ctx.timerService().registerProcessingTimeTimer(); ctx.timerService().registerEventTimeTimer(); ctx.timerService().deleteEventTimeTimer(); ctx.timerService().deleteProcessingTimeTimer();2. 定时器源码解析 2.1 用户定义的timer保存在哪里?是怎么维护的?怎么去重的?
先给出结论:用户定义的timer会保存在内存中的队列中、通过注册的时间进行排序、通过用户定义事件的key进行去重的。
2.1.1 用户注册的timer是怎么保存的-TimerService通过代码用户注册的功能是ctx.timerService().registerProcessingTimeTimer();,该功能是定义接口TimerService内,下面是该接口的声明:
public interface TimerService {
void registerProcessingTimeTimer(long time);
void registerEventTimeTimer(long time);
void deleteProcessingTimeTimer(long time);
void deleteEventTimeTimer(long time);
}
2.1.2 寻找TimerService的实现类
我们知道用户定义的KeyedProcessFuntion在内部都会封装成KeyedProcessOperator,现在看下这个类KeyedProcessOperator
private class ContextImpl extends KeyedProcessFunction.Context { private final TimerService timerService; ContextImpl(KeyedProcessFunction function, TimerService timerService) { function.super(); this.timerService = checkNotNull(timerService); } @Override public TimerService timerService() { return timerService; } }
ContextImpl是一个内部类,可以发现实现了KeyedProcessFunction
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
InternalTimerService internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
context = new ContextImpl(userFunction, timerService);
onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}
从上面可以看到ContextImpl中的timeService来自getInternalTimerService()函数。所以接下来就是继续查看getInternalTimerService()函数。
注意:getInternalTimerService()函数的三个参数:“user-timers”, VoidNamespaceSerializer.INSTANCE, this
publicInternalTimerService getInternalTimerService( String name, TypeSerializer namespaceSerializer, Triggerable triggerable) { checkTimerServiceInitialization(); // the following casting is to overcome type restrictions. KeyedStateBackend keyedStateBackend = getKeyedStateBackend(); TypeSerializer keySerializer = keyedStateBackend.getKeySerializer(); InternalTimeServiceManager keyedTimeServiceHandler = (InternalTimeServiceManager ) timeServiceManager; TimerSerializer timerSerializer = new TimerSerializer<>(keySerializer, namespaceSerializer); return keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable); }
通过注释:我们知道,一个操作可以有多个定时器(我们在KeyedProcessFuntion可以注册多个定时器),他们都有对应的命名空间,通过key的不同决定是否是同一个定时器。通过上面发现keyedTimeServiceHandler.getInternalTimerService(name, timerSerializer, triggerable)获得InternalTimerService, 而keyedTimeServiceHandler是一个InternalTimeServiceManager,所以我们又要继续查看InternalTimeServiceManager,该类是管理各个InternalTimerService
publicInternalTimerService getInternalTimerService( String name, TimerSerializer timerSerializer, Triggerable triggerable) { InternalTimerServiceImpl timerService = registerOrGetTimerService(name, timerSerializer); timerService.startTimerService( timerSerializer.getKeySerializer(), timerSerializer.getNamespaceSerializer(), triggerable); return timerService; } @SuppressWarnings("unchecked") InternalTimerServiceImpl registerOrGetTimerService(String name, TimerSerializer timerSerializer) { InternalTimerServiceImpl timerService = (InternalTimerServiceImpl ) timerServices.get(name); if (timerService == null) { timerService = new InternalTimerServiceImpl<>( localKeyGroupRange, keyContext, processingTimeService, createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, timerSerializer), createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer)); timerServices.put(name, timerService); } return timerService; }
终于找到结果了:通过进来的getInternalTimerService()发现,InternalTimerServiceImpl
我们通过registerOrGetTimerService()函数,可以看到会创建两个队列createTimerPriorityQueue分别用于维护事件时间和处理时间的Timer。
privateKeyGroupedInternalPriorityQueue > createTimerPriorityQueue( String name, TimerSerializer timerSerializer) { return priorityQueueSetFactory.create( name, timerSerializer); }
上面是创建队列的方法,可以发现队列中方的是TimerHeapInternalTimer,该类的定义是
@Nonnull private final K key; @Nonnull private final N namespace; private final long timestamp; private transient int timerHeapIndex;
其中key就是我们存储的key,timestamp就是触发的时间,timerHeapIndex在队列中快速定位到该定时器的索引。
在创建的队列中,使用的队列对象是HeapPriorityQueueSet,里面有增加和删除的操作,这就不展示了。
ok!! 上面所有的工作都是为了解决我们注册的时间器存在哪里 > KeyGroupedInternalPriorityQueue
2.2 我们写的定时器是怎么写入这个队列呢?答案是InternalTimerServiceImpl从上面的代码可以发现,该类是InternalTimerService接口实现类。现在查看该类的代码
public class InternalTimerServiceImplimplements InternalTimerService , ProcessingTimeCallback { private final ProcessingTimeService processingTimeService; private final KeyContext keyContext; private final KeyGroupedInternalPriorityQueue > processingTimeTimersQueue; private final KeyGroupedInternalPriorityQueue > eventTimeTimersQueue; @Override public void registerProcessingTimeTimer(N namespace, long time) { InternalTimer oldHead = processingTimeTimersQueue.peek(); if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) { long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; // check if we need to re-schedule our timer to earlier if (time < nextTriggerTime) { if (nextTimer != null) { nextTimer.cancel(false); } nextTimer = processingTimeService.registerTimer(time, this); } } } @Override public void registerEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void deleteProcessingTimeTimer(N namespace, long time) { processingTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); } @Override public void deleteEventTimeTimer(N namespace, long time) { eventTimeTimersQueue.remove(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace)); }
上面的代码很清楚的展示了注册和删除的操作。
2.3 timer是怎么触发? 怎么执行用户在onTimer中定义功能的?其实后面的处理过程都是在InternalTimerServiceImpl类中。
@Override
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
InternalTimer timer;
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
processingTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer);
}
if (timer != null && nextTimer == null) {
nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
}
}
上面是触发处理时间的定时器,按顺序从队列中获取到比时间戳time小的所有Timer,并挨个执行Triggerable.onProcessingTime(timer)方法,并且设置触发的keykeyContext.setCurrentKey(timer.getKey());
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
上面的方法是触发事件时间的定时器,可以看到它是和水位线进行比较的,并挨个执行Triggerable.onEventTime(timer)方法,并且设置触发的keykeyContext.setCurrentKey(timer.getKey());
完结。。。。



