本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.
Flink 提供了 WatermarkGenerator 接口用来"制造"水印:
@Public public interface WatermarkGenerator{ void onEvent(T event, long eventTimestamp, WatermarkOutput output); void onPeriodicEmit(WatermarkOutput output); }
用户可以自定义实现 WatermarkGenerator 接口完成水印的发送,同时,为了方便用户开发,Flink 提供了 NoWatermarksGenerator、BoundedOutOfOrdernessWatermarks 和 WatermarksWithIdleness等默认实现。
NoWatermarksGenerator类如其名,该类的 onEvent 和 onPeriodicEmit 方法均为空实现,即该类不会发送水印。
@Public public final class NoWatermarksGeneratorBoundedOutOfOrdernessWatermarksimplements WatermarkGenerator { @Override public void onEvent(E event, long eventTimestamp, WatermarkOutput output) {} @Override public void onPeriodicEmit(WatermarkOutput output) {} }
由于网络延迟、数据分片等原因,生产环境普遍存在带有混乱时间戳的事件流,如下所示。显示的数字表达的是这些事件实际发生时间的时间戳。到达的第一个事件发生在时间 4,随后发生的事件发生在更早的时间 2,依此类推:
假设我们要对数据流排序,我们想要达到的目的是:应用程序应该在数据流里的事件到达时就有一个算子(我们暂且称之为排序)开始处理事件,这个算子所输出的流是按照时间戳排序好的。
让我们重新审视这些数据:
(1) 我们的排序器看到的第一个事件的时间戳是 4,但是我们不能立即将其作为已排序的流释放。因为我们并不能确定它是有序的,并且较早的事件有可能并未到达。事实上,如果站在上帝视角,我们知道,必须要等到时间戳为 2 的元素到来时,排序器才可以有事件输出。
需要一些缓冲,需要一些时间,但这都是值得的
(2) 接下来的这一步,如果我们选择的是固执的等待,我们永远不会有结果。首先,我们看到了时间戳为 4 的事件,然后看到了时间戳为 2 的事件。可是,时间戳小于 2 的事件接下来会不会到来呢?可能会,也可能不会。再次站在上帝视角,我们知道,我们永远不会看到时间戳 1。
最终,我们必须勇于承担责任,并发出指令,把带有时间戳 2 的事件作为已排序的事件流的开始
(3)然后,我们需要一种策略,该策略定义:对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。
这正是 watermarks 的作用 — 它们定义何时停止等待较早的事件。
Flink 中事件时间的处理取决于 watermark 生成器,后者将带有时间戳的特殊元素插入流中形成 watermarks。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。
当 watermark 以 2 或更大的时间戳到达时,事件流的排序器应停止等待,并输出 2 作为已经排序好的流。
(4) 我们可能会思考,如何决定 watermarks 的不同生成策略
每个事件都会延迟一段时间后到达,然而这些延迟有所不同,有些事件可能比其他事件延迟得更多。一种简单的方法是假定这些延迟受某个最大延迟的限制。Flink 将此策略称为最大无序边界 (bounded-out-of-orderness) watermark。
@Public public class BoundedOutOfOrdernessWatermarksimplements WatermarkGenerator { // 迄今为止最大的时间戳 private long maxTimestamp; // 允许的最大乱序时间 private final long outOfOrdernessMillis; public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { checkNotNull(maxOutOfOrderness, "maxOutOfOrderness"); checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative"); this.outOfOrdernessMillis = maxOutOfOrderness.toMillis(); // 初始最大时间戳 this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; } @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { // 更新最大时间戳 maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { // 发送水印 // 水印为最大时间戳-乱序时间-1 output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); } }
需要注意的是,每来1条事件数据,只是更新了事件流的最大时间戳,并不会直接发送水印。
只有 {@link ExecutionConfig#getAutoWatermarkInterval()} 周期性间隔到了以后,水印才会被发送。
所以如果这个时间间隔设置的比较大,水印的发送会有较大的延迟。
如果各个分片事件流所携带的时间戳是单调递增的,则可将 BoundedOutOfOrdernessWatermarks 的 outOfOrdernessMillis 设置为0,即 AscendingTimestampsWatermarks 类。
@Public public class AscendingTimestampsWatermarksWatermarksWithIdlenessextends BoundedOutOfOrdernessWatermarks { public AscendingTimestampsWatermarks() { super(Duration.ofMillis(0)); } }
Kafka Source 场景,假设 Kafka Topic 的 parttition 数目为10,且 Source 算子的并行度 > 1(假设为10)。此时,Flink 会启动10个消费线程,每个线程负责1个 partition 数据的消费。
同时,每个消费线程还会根据 partition 的数据到达情况生成 watermark,然后 Flink 会取10个线程生成的 watermark 最小值作为最终的水印发送下去。
这里就会存在下面1种情况:
假设上游往 Kafka Topic 发送数据的时候,指定发送到某个 Partition 或者配置了特殊的 hash 分区逻辑,导致该 Topic 的某些 Partition 里完全没有到达数据。
对应到上述场景,假设10个 Partition 里仅有 Partition 0 有数据,而其他 Partition 均没有数据。此时,为了取10个分区的最小 watermark,有数据的那1个分区将一直等待另外9个分区生成水印,从而导致水印生成完全被"卡住"。
为了解决上述问题,Flink 提供了 WatermarksWithIdleness 实现类,当某个分区超过一定时间未有事件数据到达,则将其标记为"空闲"分区,不再参与水印生成,从而避免了"水印取最小"操作被卡住。
@Public public class WatermarksWithIdlenessimplements WatermarkGenerator { // 水印生成器 private final WatermarkGenerator watermarks; // 空闲定时器 private final IdlenessTimer idlenessTimer; // 需要传入1个 WatermarkGenerator 实现 // 和1个空闲超时时间 idleTimeout,当分区超过 idleTimeout 时间未有事件数据到达,则被标记为空闲分区 public WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout) { this(watermarks, idleTimeout, SystemClock.getInstance()); } @VisibleForTesting WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout, Clock clock) { checkNotNull(idleTimeout, "idleTimeout"); checkArgument( !(idleTimeout.isZero() || idleTimeout.isNegative()), "idleTimeout must be greater than zero"); this.watermarks = checkNotNull(watermarks, "watermarks"); this.idlenessTimer = new IdlenessTimer(clock, idleTimeout); } @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { watermarks.onEvent(event, eventTimestamp, output); // 每来1条事件数据,就触发1次 idlenessTimer 的 activity() 操作 idlenessTimer.activity(); } @Override public void onPeriodicEmit(WatermarkOutput output) { // 首先基于 idlenessTimer 是否处于空闲状态 if (idlenessTimer.checkIfIdle()) { // 若处于空闲状态,则将 WatermarkOutput 标记为空闲,不再参与水印排序 output.markIdle(); } else { watermarks.onPeriodicEmit(output); } } // ------------------------------------------------------------------------ @VisibleForTesting static final class IdlenessTimer { // 时钟,用于计算空闲时间(即间隔多久未收到新数据) private final Clock clock; // 数据累积器(每来1条时间数据,该数字就加1) private long counter; // 上次时间数据对应的 counter private long lastCounter; // 开始时间 private long startOfInactivityNanos; // 最大空闲时间 private final long maxIdleTimeNanos; IdlenessTimer(Clock clock, Duration idleTimeout) { this.clock = clock; long idleNanos; try { idleNanos = idleTimeout.tonanos(); } catch (ArithmeticException ignored) { // long integer overflow idleNanos = Long.MAX_VALUE; } this.maxIdleTimeNanos = idleNanos; } // 每调用1次,counter就加1 public void activity() { counter++; } public boolean checkIfIdle() { // checkIfIdle() 方法因为是在 onPeriodicEmit 方法里被调用的 // 所以 checkIfIdle() 也是被周期性调用的 // 调用的时候,如果发现 counter != lastCounter,则代表在该周期间隔内,有新的事件数据到达 // 此时,更新 lastCounter 为 counter,并将 startOfInactivityNanos 重新置为0 // 并返回 false // **分支1** if (counter != lastCounter) { // activity since the last check. we reset the timer lastCounter = counter; startOfInactivityNanos = 0L; return false; } else // 调用的时候,如果发现 counter == lastCounter,则代表在该周期间隔内,没有新的事件数据到达,分区处于空闲状态 // 此时,如果 startOfInactivityNanos == 0L,代表第1次出现周期间隔内没有新数据到达 // 基于 startOfInactivityNanos = clock.relativeTimeNanos() 开始计时 // 并返回 false // **分支2** if (startOfInactivityNanos == 0L) { // first time that we see no activity since the last periodic probe // begin the timer startOfInactivityNanos = clock.relativeTimeNanos(); return false; // 如果连续2个及以上个周期间隔未有新数据到达,则会执行到该分支 // clock.relativeTimeNanos() - startOfInactivityNanos 计算空闲时间 // 并返回是否大于 maxIdleTimeNanos // **分支3** } else { return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos; } } } }
本文到此结束,感谢阅读!



