水位线的概念理解及工作原理:
watermark是用于处理由于网络、背压等原因产生的乱序事件,窗口结束时间+延迟时间=最大waterMark值,即当waterMark值大于的上述计算出的最大waterMark值,该窗口内的数据就属于迟到的数据,无法参与window计算;代码中生成水位线的时间(即调用assignTimestampsAndWatermarks(WatermarkStrategy
真实的项目中不光要考虑延迟时间,还要考虑长时间无数据无法触发计算的情况。
生成水位线的源码解析(flink1.11版本之后):
WatermarkStrategy接口继承了接口TimestampAssignerSupplier
其中继承自WatermarkGeneratorSupplier
1.onEvent()方法:每条记录进来都会调用一次这个方法,入参有3个,第一个是记录,第二个是记录携带的时间,如果注册了时间就会有,第三个参数时水印发射器WatermarkOutputoutput,可以通过这个参数对水印进行发射,用户可以根据自己的业务场景来编写自己的水印生成以及发射逻辑。该方法的重点是每条记录都会调用.
2.onPeriodicEmit():该方法是Flink提供的一个定时器方法,每隔一段时间会调用此方法,入参是WatermarkOutputoutput,用户可以通过这个方法每隔一段时间发送一次水印,当记录数过多时,每条记录都发送一次水印明显不合适,也影响性能,此时可以通过这个方法进行水印的定时发送,而onEvent只记录当前水印而选择不发射出去。该方法的参数配置为env.getConfig().setAutoWatermarkInterval(300L),入参是毫秒数,表示隔多少毫秒向下游算子发送一次水印。
flink也提供了三个WatermarkGenerator的实现类,即flink生成水印的默认三种策略:
1.BoundedOutOfOrdernessWatermark:它的onEvent方法每次从事件数据中提取时间戳,并计算最大时间戳maxTimestamp;在周期触发的onPeriodicEmit方法中,生成的Watermark等于最大时间戳maxTimestamp减去最大乱序长度outOfOrdernessMillis,再减1毫秒。
使用:WatermarkStrategy
2.BoundedOutOfOrdernessWatermark的一个子类,即outOfOrdernessMillis设为0;
使用:与上类似;
3.WatermarkStrategy.noWatermarks():一个算子从多个上游算子中获取数据时,会取上游最小的Watermark作为自身的Watermark,并检测是否满足窗口触发条件。当达不到触发条件,窗口会在内存中缓存大量窗口数据,导致内存不足等问题。
注:flink提供了设置流状态为空闲的withIdleness方法。在设置的超时时间内,当某个数据流一直没有事件数据到达,就标记这个流为空闲。下游算子不需要等待这条数据流产生的Watermark,而取其他上游激活状态的Watermark,来决定是否需要触发窗口计算。
WatermarkStrategy.withIdleness(Duration.ofMillis(5))
上面代码设置超时时间5毫秒,超过这个时间,没有生成Watermark,将流状态设置空闲,当下次有新的Watermark生成并发送到下游时,重新设置为活跃。
Flink水印算子简要流程:
首先TimestampsAndWatermarksOperator算子会在open方法中初始化用户定义的水印逻辑及方式,并且如果需要定时发送水印会,注册一个定时器触发水印定时发送。
其次,当元素到达算子后会调用processElement(StreamRecord
如果需要定时发送水印,则会注册一个定时器,通过onProcessingTime来触发定时器的内容,而内容也十分简单,先调用用户定义的watermarkGenerator.onPeriodicEmit方法发送水印,然后获取当前时间,最后注册当前时间加水印定时发送间隔的定时触发器,等待下次触发该方法。
flink sql中的水位线:
如果需要自定义flink sql中的水位线机制,可以自定义connector,将DynamicTableSource(创建动态源表)的子类中的applyWatermark()方法进行重写,使用自定义的WatermarkGenerator即可。



