1. 概述
给datastream添加watermark的方法
assignTimestampsAndWatermarks(WatermarkStrategywatermarkStrategy)
WatermarkStrategy(watermark策略)是一个interface,包含一个必须实现的方法
WatermarkGeneratorcreateWatermarkGenerator(Context var1);
这个方法需要返回的WatermarkGenerator(watermark生成器)也是一个接口,包含两个方法
void onEvent(T var1, long var2, WatermarkOutput var4); void onPeriodicEmit(WatermarkOutput var1);
完整的写出来就是
data.assignTimestampsAndWatermarks(new WatermarkStrategy() { @Override public WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator () { @Override public void onEvent(TestObj testObj, long l, WatermarkOutput watermarkOutput) { } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { } }; } })
onEvent方法在事件发生时调用,ontPeriodicEmit周期性调用,这两个方法都可以用来生成watermark
生成方法
watermarkOutput.emitWatermark(Watermark wm);
只要在上面两个方法中调用就可以了,Watermark的构造方法只有一个参数,是long类型的时间戳,也就是watermark的标记时间
2. 内置的Watermark生成器
WatermarkStrategy.forMonotonousTimestamps(); WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));
翻了下源码,forMonotonousTimestamps()是forBoundedOutOfOrderness()乱序时间为0的情况
staticWatermarkStrategy forMonotonousTimestamps() { return (ctx) -> { return new AscendingTimestampsWatermarks(); }; } public class AscendingTimestampsWatermarks extends BoundedOutOfOrdernessWatermarks { public AscendingTimestampsWatermarks() { super(Duration.ofMillis(0L)); } }
所以我们看一下 BoundedOutOfOrdernessWatermarks就可以了
public class BoundedOutOfOrdernessWatermarksimplements WatermarkGenerator { private long maxTimestamp; private final long outOfOrdernessMillis; public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { Preconditions.checkNotNull(maxOutOfOrderness, "maxOutOfOrderness"); Preconditions.checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative"); this.outOfOrdernessMillis = maxOutOfOrderness.toMillis(); this.maxTimestamp = -9223372036854775808L + this.outOfOrdernessMillis + 1L; } public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { this.maxTimestamp = Math.max(this.maxTimestamp, eventTimestamp); } public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L)); } }
每次事件取最大时间,然后周期性生成最大事件时间减去最大乱序时间的做为watermark,也就是说先发生但后到达的事件的watermark会和在它后面发生但先于它到达,且到达时间差小于最大乱序时间的事件的watermark相同
onPeriodicEmit的周期可以用下面方法设置
env.getConfig().setAutoWatermarkInterval(long interval);
3. withIdleness方法
使用方法
WatermarkStrategy.forMonotonousTimestamps().withIdleness(Duration.ofMinutes(1))
这个方法可以解决个别分区在一段时间内数据空闲导致的没有watermark生成,而由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,导致watermark不变的问题
这个问题是如何解决的呢?官网没说,还是看源码
default WatermarkStrategywithIdleness(Duration idleTimeout) { Preconditions.checkNotNull(idleTimeout, "idleTimeout"); Preconditions.checkArgument(!idleTimeout.isZero() && !idleTimeout.isNegative(), "idleTimeout must be greater than zero"); return new WatermarkStrategyWithIdleness(this, idleTimeout); } final class WatermarkStrategyWithIdleness implements WatermarkStrategy { private static final long serialVersionUID = 1L; private final WatermarkStrategy baseStrategy; private final Duration idlenessTimeout; WatermarkStrategyWithIdleness(WatermarkStrategy baseStrategy, Duration idlenessTimeout) { this.baseStrategy = baseStrategy; this.idlenessTimeout = idlenessTimeout; } public TimestampAssigner createTimestampAssigner(Context context) { return this.baseStrategy.createTimestampAssigner(context); } public WatermarkGenerator createWatermarkGenerator(org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier.Context context) { return new WatermarksWithIdleness(this.baseStrategy.createWatermarkGenerator(context), this.idlenessTimeout); } } public class WatermarksWithIdleness implements WatermarkGenerator { private final WatermarkGenerator watermarks; private final WatermarksWithIdleness.IdlenessTimer idlenessTimer; public WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout) { this(watermarks, idleTimeout, SystemClock.getInstance()); } @VisibleForTesting WatermarksWithIdleness(WatermarkGenerator watermarks, Duration idleTimeout, Clock clock) { Preconditions.checkNotNull(idleTimeout, "idleTimeout"); Preconditions.checkArgument(!idleTimeout.isZero() && !idleTimeout.isNegative(), "idleTimeout must be greater than zero"); this.watermarks = (WatermarkGenerator)Preconditions.checkNotNull(watermarks, "watermarks"); this.idlenessTimer = new WatermarksWithIdleness.IdlenessTimer(clock, idleTimeout); } public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { this.watermarks.onEvent(event, eventTimestamp, output); this.idlenessTimer.activity();//每次事件调用一次 } public void onPeriodicEmit(WatermarkOutput output) { //判断是否是空闲 if (this.idlenessTimer.checkIfIdle()) { output.markIdle();//标记为空闲 } else { this.watermarks.onPeriodicEmit(output);//正常执行 } } @VisibleForTesting static final class IdlenessTimer { private final Clock clock; private long counter; private long lastCounter; private long startOfInactivityNanos; private final long maxIdleTimeNanos; IdlenessTimer(Clock clock, Duration idleTimeout) { this.clock = clock; long idleNanos; try { idleNanos = idleTimeout.tonanos(); } catch (ArithmeticException var6) { idleNanos = 9223372036854775807L; } this.maxIdleTimeNanos = idleNanos; } public void activity() { ++this.counter;//增加一个计数 } public boolean checkIfIdle() { //本次计数和上次计数不同(期间有事件发生) if (this.counter != this.lastCounter) { //用本次计数替换上次计数 this.lastCounter = this.counter; //开始计数间重置为0 this.startOfInactivityNanos = 0L; //返回非空闲 return false; //本次计数和上次计数相同(期间无事件发生),开始计数时间为0 } else if (this.startOfInactivityNanos == 0L) { //开始计数时间设置为当前系统时间ns数 this.startOfInactivityNanos = this.clock.relativeTimeNanos(); //返回非空闲 return false; //本次计数和上次计数相同(期间无事件发生),开始计数时间不为0 } else { //如果当前系统ns数减去开始计数时间大于最大空闲时间,返回空闲,否则返回非空闲 return this.clock.relativeTimeNanos() - this.startOfInactivityNanos > this.maxIdleTimeNanos; } } } }
都写注解里啦
4. createTimestampAssigner
assignTimestampsAndWatermarks还有一个方法需要注意
default TimestampAssignercreateTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier.Context context)
这个方法在Kafka/Kinesis 数据源时不需要实现,可以从数据记录中获取到timestamp,但是其他情况,比如说例子里这种就需要手动实现一下,不然会抛出异常



