1 应用场景
2 什么是窗口
dataStreamSource.flatMap(new MyFlatMapFunction())
.keyBy("")
.timeWindow(Time.seconds(10))
.allowedLateness(Time.seconds(12)) //允许多大的延迟
[00:00:00,00:00:10) [00:00:10,00:00:20) ... [00:00:50,00:01:00)
3 窗口分类
元素的窗口分配算法
TumblingEventTimeWindows窗口的分配算法:
timestamp - (timestamp - offset + windowSize) % windowSize使用event-time模式时,默认提供的window有TumblingEventTimeWindows,SlidingEventTimeWindows,EventTimeSessionWindow等。
4 窗口相关知识点
处理时间
事件时间
生成watermark的方式有两种
-
AssignerWithPeriodicWatermarks :一定时间间隔或者达到一定的记录条数会产生一个watermark。
每隔n毫秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark。
watermark生成的时间间隔(每n毫秒)是通过ExecutionConfig.setAutoWatermarkInterval()
-
AssignerWithPunctuatedWatermarks:基于event time通过一定的逻辑产生watermark,比如收到一个数据就产生一个watermark。
间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理
以上两个接口都继承自 TimestampAssigner
public class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks{ private final long maxOutofOrderness = 5000; private long currentMaxTimestamp; @Override public long extractTimestamp(PacketDescriptor element, long previousElementTimestamp) { long timestamp = element.getTime(); currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp); return timestamp; } @Override //每次调用分配器的getCurrentWatermark()方法时,如果返回的watermark非空且大于前一个watermark,则会发出新的watermark。 public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutofOrderness); } }
5 窗口的几个常用算子
-
keyby
-
window
-
trigger
三个默认提供的window operator中,都提供了默认的trigger,我们使用这三个方法时,没有写trigger,直接写window process,如 .reduce()。这是因为这三个window中的getDefaultTrigger()方法使用的是EventTimeTrigger,也就是它给我们提供了默认的trigger。
EventTimeTrigger.java
@Override
//每次数据进入该window时都会触发
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
//此处的目的不是为了为了水位线到了触发,而是为了allow lateness数据到达时触发,此时的数位已经超过了窗口最大事件,但是窗口并没有被回收。
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
//trigger注册的时间达到时触发
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public Trigger window size = 10s
第一个数据(2019-06-03 17:00:02)到达时,此数据所属窗口为[2019-06-03 17:00:00,2019-06-03 17:00:10)
调用MyTimestampsAndWatermarks的getCurrentWatermark()方法计算watermark为:2019-06-03 16:59:57
调用onElement()方法,
window.maxTimestamp() 为:2019-06-03 17:00:10
ctx.getCurrentWatermark()为:2019-06-03 16:59:57
if条件不满足,走else
注册一个trigger,时间为:2019-06-03 17:00:10,底层是一个set,同一个window相同时间的trigger,只会注册一个
第二个数据(2019-06-03 17:00:11)到达时,此数据所属窗口为[2019-06-03 17:00:10,2019-06-03 17:00:20)
调用MyTimestampsAndWatermarks的getCurrentWatermark()方法计算watermark为:2019-06-03 17:00:06
调用onElement()方法,上一个窗口(注意此处不会判断上一个窗口,而是判断当前窗口[2019-06-03 17:00:10,2019-06-03 17:00:20)!!)
window.maxTimestamp() 为:2019-06-03 17:00:10
ctx.getCurrentWatermark()为:2019-06-03 17:00:06
if条件不满足,走else
注册一个trigger,时间为:2019-06-03 17:00:20
第三个数据(2019-06-03 17:00:15)到达时,此数据所属窗口为[2019-06-03 17:00:10,2019-06-03 17:00:20)
调用MyTimestampsAndWatermarks的getCurrentWatermark()方法计算watermark为:2019-06-03 17:00:10
调用onElement()方法,上一个窗口(注意此处不会判断上一个窗口,而是判断当前窗口[2019-06-03 17:00:10,2019-06-03 17:00:20)!!)
window.maxTimestamp() 为:2019-06-03 17:00:10
ctx.getCurrentWatermark()为:2019-06-03 17:00:10
满足if条件,触发小于等于此watermaker的trigger,window开始触发计算,触发后并清除此window的trigger注册的时间。
第四个数据(2019-06-03 17:00:05)到达时,假设此时watermark为:2019-06-03 17:00:30
调用onElement()方法,上一个窗口
window.maxTimestamp() 为:2019-06-03 17:00:10
ctx.getCurrentWatermark()为:2019-06-03 17:00:30
如果window.maxTimestamp() + allowedLateness > watermark,此窗口还未删除,直接触发窗口计算,并把当前数据跟之前此窗口计算的结果运算;如果window.maxTimestamp() + allowedLateness <= watermark,判断到此窗口已删除,则丢弃数据
window触发计算条件
watermark >= endTime
window里有元素
触发window计算(trigger注册时间小于等于watermark的操作)
keyby
window
trigger
三个默认提供的window operator中,都提供了默认的trigger,我们使用这三个方法时,没有写trigger,直接写window process,如 .reduce()。这是因为这三个window中的getDefaultTrigger()方法使用的是EventTimeTrigger,也就是它给我们提供了默认的trigger。
EventTimeTrigger.java
@Override
//每次数据进入该window时都会触发
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
//此处的目的不是为了为了水位线到了触发,而是为了allow lateness数据到达时触发,此时的数位已经超过了窗口最大事件,但是窗口并没有被回收。
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
//trigger注册的时间达到时触发
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public Trigger window size = 10s
第一个数据(2019-06-03 17:00:02)到达时,此数据所属窗口为[2019-06-03 17:00:00,2019-06-03 17:00:10)
调用MyTimestampsAndWatermarks的getCurrentWatermark()方法计算watermark为:2019-06-03 16:59:57
调用onElement()方法,
window.maxTimestamp() 为:2019-06-03 17:00:10
ctx.getCurrentWatermark()为:2019-06-03 16:59:57
if条件不满足,走else
注册一个trigger,时间为:2019-06-03 17:00:10,底层是一个set,同一个window相同时间的trigger,只会注册一个
第二个数据(2019-06-03 17:00:11)到达时,此数据所属窗口为[2019-06-03 17:00:10,2019-06-03 17:00:20)
调用MyTimestampsAndWatermarks的getCurrentWatermark()方法计算watermark为:2019-06-03 17:00:06
调用onElement()方法,上一个窗口(注意此处不会判断上一个窗口,而是判断当前窗口[2019-06-03 17:00:10,2019-06-03 17:00:20)!!)
window.maxTimestamp() 为:2019-06-03 17:00:10
ctx.getCurrentWatermark()为:2019-06-03 17:00:06
if条件不满足,走else
注册一个trigger,时间为:2019-06-03 17:00:20
第三个数据(2019-06-03 17:00:15)到达时,此数据所属窗口为[2019-06-03 17:00:10,2019-06-03 17:00:20)
调用MyTimestampsAndWatermarks的getCurrentWatermark()方法计算watermark为:2019-06-03 17:00:10
调用onElement()方法,上一个窗口(注意此处不会判断上一个窗口,而是判断当前窗口[2019-06-03 17:00:10,2019-06-03 17:00:20)!!)
window.maxTimestamp() 为:2019-06-03 17:00:10
ctx.getCurrentWatermark()为:2019-06-03 17:00:10
满足if条件,触发小于等于此watermaker的trigger,window开始触发计算,触发后并清除此window的trigger注册的时间。
第四个数据(2019-06-03 17:00:05)到达时,假设此时watermark为:2019-06-03 17:00:30
调用onElement()方法,上一个窗口
window.maxTimestamp() 为:2019-06-03 17:00:10
ctx.getCurrentWatermark()为:2019-06-03 17:00:30
如果window.maxTimestamp() + allowedLateness > watermark,此窗口还未删除,直接触发窗口计算,并把当前数据跟之前此窗口计算的结果运算;如果window.maxTimestamp() + allowedLateness <= watermark,判断到此窗口已删除,则丢弃数据
window触发计算条件
watermark >= endTime
window里有元素
触发window计算(trigger注册时间小于等于watermark的操作)
WindowOperator.java
public void processElement(StreamRecordelement) throws Exception //获取元素对应的窗口 final Collection elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows boolean isSkippedElement = true; //获取元素的key final K key = this. getKeyedStateBackend().getCurrentKey(); for (W window: elementWindows) { // 判断窗口时效就continue if (isWindowLate(window)) { continue; } isSkippedElement = false; //元素状态写入 windowState.setCurrentNamespace(window); windowState.add(element.getValue()); triggerContext.key = key; triggerContext.window = window; //判断元素是否触发窗口 TriggerResult triggerResult = triggerContext.onElement(element); // 如果窗口触发,获取state,注入window function if (triggerResult.isFire()) { ACC contents = windowState.get(); if (contents == null) { continue; } emitWindowContents(window, contents); } // 如果判断清理状态,则执行 if (triggerResult.isPurge()) { windowState.clear(); } //注册timer, 窗口结束时间清理window state registerCleanupTimer(window); }
对于迟到太多的数据,其中 isWindowLate(window) 方法,也就是 window.maxTimestamp() + allowedLateness <= watermark ,若小于,则window过期,需要删除window对象,删除window状态;若大于,此窗口还未删除,直接触发窗口计算,并把当前数据跟之前此窗口计算的结果运算做merge操作。
窗口清理逻辑
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
-
evictor
不经常用
-
processapplyreduce



