栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink源码解析系列-- WatermarkGenerator接口及其常用实现

Flink源码解析系列-- WatermarkGenerator接口及其常用实现

本文的Flink源码版本为: 1.15-SNAPSHOT,读者可自行从Github clone.

Flink 提供了 WatermarkGenerator 接口用来"制造"水印:

@Public
public interface WatermarkGenerator {

    
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    
    void onPeriodicEmit(WatermarkOutput output);
}

用户可以自定义实现 WatermarkGenerator 接口完成水印的发送,同时,为了方便用户开发,Flink 提供了 NoWatermarksGenerator、BoundedOutOfOrdernessWatermarks 和 WatermarksWithIdleness等默认实现。

NoWatermarksGenerator

类如其名,该类的 onEvent 和 onPeriodicEmit 方法均为空实现,即该类不会发送水印。

@Public
public final class NoWatermarksGenerator implements WatermarkGenerator {

    @Override
    public void onEvent(E event, long eventTimestamp, WatermarkOutput output) {}

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {}
}
BoundedOutOfOrdernessWatermarks

由于网络延迟、数据分片等原因,生产环境普遍存在带有混乱时间戳的事件流,如下所示。显示的数字表达的是这些事件实际发生时间的时间戳。到达的第一个事件发生在时间 4,随后发生的事件发生在更早的时间 2,依此类推:

假设我们要对数据流排序,我们想要达到的目的是:应用程序应该在数据流里的事件到达时就有一个算子(我们暂且称之为排序)开始处理事件,这个算子所输出的流是按照时间戳排序好的。

让我们重新审视这些数据:

(1) 我们的排序器看到的第一个事件的时间戳是 4,但是我们不能立即将其作为已排序的流释放。因为我们并不能确定它是有序的,并且较早的事件有可能并未到达。事实上,如果站在上帝视角,我们知道,必须要等到时间戳为 2 的元素到来时,排序器才可以有事件输出。

需要一些缓冲,需要一些时间,但这都是值得的

(2) 接下来的这一步,如果我们选择的是固执的等待,我们永远不会有结果。首先,我们看到了时间戳为 4 的事件,然后看到了时间戳为 2 的事件。可是,时间戳小于 2 的事件接下来会不会到来呢?可能会,也可能不会。再次站在上帝视角,我们知道,我们永远不会看到时间戳 1。

最终,我们必须勇于承担责任,并发出指令,把带有时间戳 2 的事件作为已排序的事件流的开始

(3)然后,我们需要一种策略,该策略定义:对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。

这正是 watermarks 的作用 — 它们定义何时停止等待较早的事件。

Flink 中事件时间的处理取决于 watermark 生成器,后者将带有时间戳的特殊元素插入流中形成 watermarks。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。

当 watermark 以 2 或更大的时间戳到达时,事件流的排序器应停止等待,并输出 2 作为已经排序好的流。

(4) 我们可能会思考,如何决定 watermarks 的不同生成策略

每个事件都会延迟一段时间后到达,然而这些延迟有所不同,有些事件可能比其他事件延迟得更多。一种简单的方法是假定这些延迟受某个最大延迟的限制。Flink 将此策略称为最大无序边界 (bounded-out-of-orderness) watermark。

@Public
public class BoundedOutOfOrdernessWatermarks implements WatermarkGenerator {

    // 迄今为止最大的时间戳
    private long maxTimestamp;

    // 允许的最大乱序时间
    private final long outOfOrdernessMillis;

    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // 初始最大时间戳
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
		// 更新最大时间戳
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
		// 发送水印
		// 水印为最大时间戳-乱序时间-1
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

需要注意的是,每来1条事件数据,只是更新了事件流的最大时间戳,并不会直接发送水印。

只有 {@link ExecutionConfig#getAutoWatermarkInterval()} 周期性间隔到了以后,水印才会被发送。

所以如果这个时间间隔设置的比较大,水印的发送会有较大的延迟。

如果各个分片事件流所携带的时间戳是单调递增的,则可将 BoundedOutOfOrdernessWatermarks 的 outOfOrdernessMillis 设置为0,即 AscendingTimestampsWatermarks 类。

@Public
public class AscendingTimestampsWatermarks extends BoundedOutOfOrdernessWatermarks {

    public AscendingTimestampsWatermarks() {
        super(Duration.ofMillis(0));
    }
}
WatermarksWithIdleness

Kafka Source 场景,假设 Kafka Topic 的 parttition 数目为10,且 Source 算子的并行度 > 1(假设为10)。此时,Flink 会启动10个消费线程,每个线程负责1个 partition 数据的消费。

同时,每个消费线程还会根据 partition 的数据到达情况生成 watermark,然后 Flink 会取10个线程生成的 watermark 最小值作为最终的水印发送下去。

这里就会存在下面1种情况:

假设上游往 Kafka Topic 发送数据的时候,指定发送到某个 Partition 或者配置了特殊的 hash 分区逻辑,导致该 Topic 的某些 Partition 里完全没有到达数据。

对应到上述场景,假设10个 Partition 里仅有 Partition 0 有数据,而其他 Partition 均没有数据。此时,为了取10个分区的最小 watermark,有数据的那1个分区将一直等待另外9个分区生成水印,从而导致水印生成完全被"卡住"。

为了解决上述问题,Flink 提供了 WatermarksWithIdleness 实现类,当某个分区超过一定时间未有事件数据到达,则将其标记为"空闲"分区,不再参与水印生成,从而避免了"水印取最小"操作被卡住。

@Public
public class WatermarksWithIdleness implements WatermarkGenerator {
	
	// 水印生成器
    private final WatermarkGenerator watermarks;
	// 空闲定时器
    private final IdlenessTimer idlenessTimer;
	
	// 需要传入1个 WatermarkGenerator 实现
	// 和1个空闲超时时间 idleTimeout,当分区超过 idleTimeout 时间未有事件数据到达,则被标记为空闲分区
    public WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout) {
        this(watermarks, idleTimeout, SystemClock.getInstance());
    }

    @VisibleForTesting
    WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout, Clock clock) {
        checkNotNull(idleTimeout, "idleTimeout");
        checkArgument(
                !(idleTimeout.isZero() || idleTimeout.isNegative()),
                "idleTimeout must be greater than zero");
        this.watermarks = checkNotNull(watermarks, "watermarks");
        this.idlenessTimer = new IdlenessTimer(clock, idleTimeout);
    }

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        watermarks.onEvent(event, eventTimestamp, output);
		// 每来1条事件数据,就触发1次 idlenessTimer 的 activity() 操作
        idlenessTimer.activity();
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
		// 首先基于 idlenessTimer 是否处于空闲状态
        if (idlenessTimer.checkIfIdle()) {
			// 若处于空闲状态,则将 WatermarkOutput 标记为空闲,不再参与水印排序
            output.markIdle();
        } else {
            watermarks.onPeriodicEmit(output);
        }
    }

    // ------------------------------------------------------------------------

    @VisibleForTesting
    static final class IdlenessTimer {

        // 时钟,用于计算空闲时间(即间隔多久未收到新数据)
        private final Clock clock;

        // 数据累积器(每来1条时间数据,该数字就加1)
        private long counter;

        // 上次时间数据对应的 counter 
        private long lastCounter;

        // 开始时间
        private long startOfInactivityNanos;

        // 最大空闲时间
        private final long maxIdleTimeNanos;

        IdlenessTimer(Clock clock, Duration idleTimeout) {
            this.clock = clock;

            long idleNanos;
            try {
                idleNanos = idleTimeout.tonanos();
            } catch (ArithmeticException ignored) {
                // long integer overflow
                idleNanos = Long.MAX_VALUE;
            }

            this.maxIdleTimeNanos = idleNanos;
        }
		
		// 每调用1次,counter就加1
        public void activity() {
            counter++;
        }

        public boolean checkIfIdle() {
			// checkIfIdle() 方法因为是在 onPeriodicEmit 方法里被调用的
			// 所以 checkIfIdle() 也是被周期性调用的
			// 调用的时候,如果发现 counter != lastCounter,则代表在该周期间隔内,有新的事件数据到达
			// 此时,更新 lastCounter 为 counter,并将 startOfInactivityNanos 重新置为0
			// 并返回 false
			// **分支1**
            if (counter != lastCounter) {
                // activity since the last check. we reset the timer
                lastCounter = counter;
                startOfInactivityNanos = 0L;
                return false;
            } else
			// 调用的时候,如果发现 counter == lastCounter,则代表在该周期间隔内,没有新的事件数据到达,分区处于空闲状态
			// 此时,如果 startOfInactivityNanos == 0L,代表第1次出现周期间隔内没有新数据到达
			// 基于 startOfInactivityNanos = clock.relativeTimeNanos() 开始计时
			// 并返回 false
			// **分支2**
            if (startOfInactivityNanos == 0L) {
                // first time that we see no activity since the last periodic probe
                // begin the timer
                startOfInactivityNanos = clock.relativeTimeNanos();
                return false;
			// 如果连续2个及以上个周期间隔未有新数据到达,则会执行到该分支
			// clock.relativeTimeNanos() - startOfInactivityNanos 计算空闲时间
			// 并返回是否大于 maxIdleTimeNanos
			// **分支3**
            } else {
                return clock.relativeTimeNanos() - startOfInactivityNanos > maxIdleTimeNanos;
            }
        }
    }
}


本文到此结束,感谢阅读!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774333.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号