栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink源码解析系列--Timer定时器

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink源码解析系列--Timer定时器

Timer(定时器)是 Flink Streaming API 提供的用于感知并利用 Processing Time/Event Time 变化的机制。Ververica blog上给出的描述如下:

Timers are what make Flink streaming applications reactive and adaptable to processing and event time changes.

对于普通用户来说,最常见的显式利用 Timer 的地方就是 KeyedProcessFunction。我们在其 processElement() 方法中注册 Timer,然后覆写其 onTimer() 方法作为 Timer 触发时的回调逻辑。根据时间特征的不同:

处理时间——调用 Context.timerService().registerProcessingTimeTimer() 注册;onTimer() 在系统时间戳达到 Timer设定的时间戳时触发。事件时间——调用 Context.timerService().registerEventTimeTimer() 注册;onTimer() 在 Flink 内部水印达到或超过 Timer 设定的时间戳时触发。 TimerService

@PublicEvolving
public interface TimerService {

    String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";

    String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";

    // 返回当前的系统时间
    long currentProcessingTime();

    // 返回当前的 event time 水印
    long currentWatermark();
	
	// 注册 Processing Time 定时器
	// 当系统时间超过给定的时间后,定时器会被触发
    void registerProcessingTimeTimer(long time);

	// 注册 Event Time 定时器
	// 当系统水印超过给定的时间后,定时器会被触发
    void registerEventTimeTimer(long time);

    // 删除 Processing Time 定时器
    void deleteProcessingTimeTimer(long time);

    // 删除 Event Time 定时器
    void deleteEventTimeTimer(long time);
}

Flink Streaming API 内部提供了 SimpleTimerService 类实现了 TimerService 接口。

@Internal
public class SimpleTimerService implements TimerService {

    private final InternalTimerService internalTimerService;

    public SimpleTimerService(InternalTimerService internalTimerService) {
        this.internalTimerService = internalTimerService;
    }

    @Override
    public long currentProcessingTime() {
        return internalTimerService.currentProcessingTime();
    }

    @Override
    public long currentWatermark() {
        return internalTimerService.currentWatermark();
    }

    @Override
    public void registerProcessingTimeTimer(long time) {
        internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void registerEventTimeTimer(long time) {
        internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void deleteProcessingTimeTimer(long time) {
        internalTimerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, time);
    }

    @Override
    public void deleteEventTimeTimer(long time) {
        internalTimerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, time);
    }
}

可以看出来,SimpleTimerService 是通过它持有的 InternalTimerService internalTimerService 来完成定时器相应操作的。

InternalTimerService

InternalTimerService 也是1个接口,可以认为是 {@link org.apache.flink.streaming.api.TimerService} 的内部增强版本。

InternalTimerService 在注册定时器的时候,需要指定该定时器的 namespace,从而可以对定时器进行分类管理。

@Internal
public interface InternalTimerService {

    long currentProcessingTime();

    long currentWatermark();

    void registerProcessingTimeTimer(N namespace, long time);

    void deleteProcessingTimeTimer(N namespace, long time);

    void registerEventTimeTimer(N namespace, long time);

    void deleteEventTimeTimer(N namespace, long time);

	// 为每个注册的 Event Time 定时器定义1个执行 Function
	// 定时器服务将在该 Function 调用操作之前为定时器设置 key context
    void forEachEventTimeTimer(BiConsumerWithException consumer)
            throws Exception;

    // 为每个注册的 Processing Time 定时器定义1个执行 Function
    // 定时器服务将在该 Function 调用操作之前为定时器设置 key context
    void forEachProcessingTimeTimer(BiConsumerWithException consumer)
            throws Exception;
}

接着看一下 InternalTimerService 接口的实现类。

TestInternalTimerService 主要用于测试(实现较简单),重点关注 InternalTimerServiceImpl。

先看一下构造函数:

InternalTimerServiceImpl(
		KeyGroupRange localKeyGroupRange,
		KeyContext keyContext,
		ProcessingTimeService processingTimeService,
		KeyGroupedInternalPriorityQueue> processingTimeTimersQueue,
		KeyGroupedInternalPriorityQueue> eventTimeTimersQueue) {

	this.keyContext = checkNotNull(keyContext);
	this.processingTimeService = checkNotNull(processingTimeService);
	this.localKeyGroupRange = checkNotNull(localKeyGroupRange);
	this.processingTimeTimersQueue = checkNotNull(processingTimeTimersQueue);
	this.eventTimeTimersQueue = checkNotNull(eventTimeTimersQueue);

	int startIdx = Integer.MAX_VALUE;
	for (Integer keyGroupIdx : localKeyGroupRange) {
		startIdx = Math.min(keyGroupIdx, startIdx);
	}
	this.localKeyGroupRangeStartIdx = startIdx;
}

ProcessingTimeService processingTimeService 主要用于完成 Processing Time 相关的操作;KeyGroupedInternalPriorityQueue> processingTimeTimersQueue 用于存储已注册的 Processing Time 定时器;KeyGroupedInternalPriorityQueue> eventTimeTimersQueue 用于存储已注册的 Event Time 定时器。

ProcessingTimeService 接口我在另外1篇文章里详细讲过,此处不再赘述。

Flink源码解析系列–ProcessingTimeService定时器

KeyGroupedInternalPriorityQueue 是 Flink 自定义的优先队列接口,其相关实现类原理跟 JDK 的优先队列类似,也是基于最小/大堆来实现的,后续我会专门写1篇文章来讲解。

registerProcessingTimeTimer
@Override
public void registerProcessingTimeTimer(N namespace, long time) {
	// 获取优先队列的顶部元素(最先执行的定时器)
	InternalTimer oldHead = processingTimeTimersQueue.peek();
	if (processingTimeTimersQueue.add(
			new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
		// 获取优先队列下一个定时器的触发时间
		long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
		// 若待注册定时器的时间早于优先队列下一个定时器的触发时间
		// 说明待注册定时器更先被触发,则需要将 nextTimer 更新为当前待注册定时器
		if (time < nextTriggerTime) {
			if (nextTimer != null) {
				nextTimer.cancel(false);
			}
			// 基于 ProcessingTimeService 注册新的定时器
			// 传入的回调方法为 InternalTimerServiceImpl 自身定义的 onProcessingTime 方法
			nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
		}
	}
}

当 Processing Time 到达优先队列中顶部元素设置的时间时,会执行 onProcessingTime 方法:

private void onProcessingTime(long time) throws Exception {
	
	nextTimer = null;

	InternalTimer timer;
	// 将优先队列中小于或等于当前系统时间的定时器依次 poll() 出队
	// 然后执行 triggerTarget.onProcessingTime(timer)
	while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
		processingTimeTimersQueue.poll();
		keyContext.setCurrentKey(timer.getKey());
		triggerTarget.onProcessingTime(timer);
	}
	
	// 重新注册下一个需要执行的定时器
	// 这里的 timer 是前面执行 while 的时候取的优先队列的顶元素
	// 若 timer != null 说明还有待执行的定时器需要处理
	// 若 nextTimer == null,说明还未定义下一个需要执行的定时器
	if (timer != null && nextTimer == null) {
		// 还是基于 ProcessingTimeService 注册新的定时器
		// ProcessingTimeService 会将该定时器提交给 Flink 自定义的 ScheduledThreadPoolExecutor 线程池,时间到了会执行传入的 this::onProcessingTime 回调方法
		// 进入下一轮 onProcessingTime 
		nextTimer =
				processingTimeService.registerTimer(
						timer.getTimestamp(), this::onProcessingTime);
	}
}

没有读这部分源码的时候,我以为当创建 Processing Time 定时器的时候,Flink 会直接将任务提交给线程池等待被执行,这种方式实现起来很简单,但如果生产环境中创建了大量的待执行定时器时,需要较多的线程资源。

而 Flink 的解决方案就很优雅,其将所有注册的定时器通过优先队列管理起来,只关心下一个最先要执行的定时器,并将其赋给 nextTimer,nextTimer 被触发执行 onProcessingTime 的时候,会依次将队列中触发时间不大于自己的依次 poll 出队触发掉。执行完 onProcessingTime 之后若优先队列中还有待执行的定时器,则重新注册下一个需要执行的定时器,循环往复。整个过程只需要启用1个定时线程即可,从而大大节省了线程资源。

registerEventTimeTimer
@Override
public void registerEventTimeTimer(N namespace, long time) {
	eventTimeTimersQueue.add(
			new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}

可以看到,registerEventTimeTimer 的逻辑很简单,就是创建好 TimerHeapInternalTimer 添加进 eventTimeTimersQueue 优先队列即可。

那定时器如何被触发呢?Processing Time 定时器是通过线程池来触发的,但 Event Time 定时器的触发显然需要依赖水印的推进。

概览下 InternalTimerServiceImpl 方法,可以发现 advanceWatermark 方法和水印相关。

public void advanceWatermark(long time) throws Exception {
	currentWatermark = time;

	InternalTimer timer;
	// 依次取出优先队列中小于或等于 time 的定时器
	// 然后执行 triggerTarget.onEventTime(timer)
	while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
		eventTimeTimersQueue.poll();
		keyContext.setCurrentKey(timer.getKey());
		triggerTarget.onEventTime(timer);
	}
}

查看一下 advanceWatermark 方法的调用链:

@Override
public void advanceWatermark(Watermark watermark) throws Exception {
	for (InternalTimerServiceImpl service : timerServices.values()) {
		service.advanceWatermark(watermark.getTimestamp());
	}
}

接着往上追溯:

public void processWatermark(Watermark mark) throws Exception {
	if (timeServiceManager != null) {
		timeServiceManager.advanceWatermark(mark);
	}
	output.emitWatermark(mark);
}

算子基类 AbstractStreamOperator 中处理水印的方法 processWatermark()。当水印到来时,就会按着上述调用链流转到 InternalTimerServiceImpl 中,并触发所有早于水印时间戳的 Timer。

综上,Processing Time 定时器是通过定时线程池推动触发的,Event Time 定时器是通过水印推动触发的。

参考文献

https://blog.csdn.net/nazeniwaresakini/article/details/104220113

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/781092.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号