我们都知道flink有窗口和watermark的概念,当watermark大于窗口的endTime,将触发窗口中数据的计算,watermark是一个不断递增的时间戳,是不断变化的,如果我们假设一个窗口的开始时间和结束时间也是不断变化的,那么watermark就不好触发窗口计算。所以根据我们的假设,内心也是认为一个特定的窗口的开始和结束时间肯定是固定的。
疑问2.窗口是怎么初始化的?如果一个特定的窗口是不会变化的,比如滚动窗口,我们在代码中只需要传入窗口的size,就可以完成窗口的构建,那么窗口的开始时间和结束时间是怎么获取的。
疑问3.如果窗口是固定的,那么第一个窗口开始时间是哪个?例如我们定义一个窗口:window(TumblingProcessingTimeWindows.of(Time.seconds(10))),当我们当前事件的处理时间是2021-12-20 10:17:14,那么这个事件是属于下面窗口中的哪一个?[10:17:10, 10:17:20),[10:17:11, 10:17:21),[10:17:12, 10:17:22),[10:17:14, 10:17:24),…(共十种可能)
我们知道如果我们设置了窗口大小,那么这些窗口都是固定不变的,也就是说这些窗口都是真实存在的,不管你用不用它,那么第一个窗口是哪一个呢?
答案就是所有类型的窗口,第一个窗口的开始时间都是1970-01-01 08:00:00
ok,我们来着上面的疑问向下走。。。
下面列出窗口的示例代码
source.map(new MyMapFunction()).setParallelism(8).keyBy(s -> s.f0) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new MyProcessWindowFunction()).setParallelism(8).print();
通过上面TumblingProcessingTimeWindows.of(Time.seconds(10))创建是一个窗口大小是10s的窗口。
public static TumblingProcessingTimeWindows of(Time size) {
return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
}
也就完成了窗口中三个变量的初始化:
private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {
if (Math.abs(offset) >= size) {
throw new IllegalArgumentException(
"TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
}
this.size = size;
this.globalOffset = offset;
this.windowStagger = windowStagger;
}
可以看到窗口中的size=10000(10秒), globalOffset=0, windowStagger=WindowStagger.ALIGNED(所有窗口都是从0开始)
现在看下TumblingProcessingTimeWindows分配窗口的功能:assignWindows
@Override public CollectionassignWindows( Object element, long timestamp, WindowAssignerContext context) { final long now = context.getCurrentProcessingTime(); if (staggerOffset == null) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } long start = TimeWindow.getWindowStartWithOffset( now, (globalOffset + staggerOffset) % size, size); return Collections.singletonList(new TimeWindow(start, start + size)); }
有三个参数element用户发送数据流数据,timestamp该数据流的时间戳,context窗口分配向下文信息,该方法在WindowOperator被调用。该类是处理事件的窗口类,所以该类不是不会使用timestamp该数据流的时间戳。由于windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);返回0,所以staggerOffset就是0。
下面就是重点了,就是获取一个窗口的开始时间,获取开始时间后+窗口大小就是结束时间。
通过以上可知TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);,第二个参数就是0,now是当前时间戳,size是窗口大小。
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
上面的公式可以简化为timestamp - timestamp % windowSize;
也即是当前时间戳-时间戳在窗口多余的时间,肯定是窗口开始时间。
下面具体例子:2021-12-20 10:17:14 通过 2021-12-20 10:17:14 - 4秒,所以该数据落在[10:17:10-10:17:20)的窗口,同样在10:17:10-10:17:19产生的数据都会落在该窗口中。
所以实际上是不存在这些窗口的[10:17:11, 10:17:21),[10:17:12, 10:17:22),[10:17:14, 10:17:24)…
总结:窗口在定义时候,可以说窗口固化了窗口.
因为我们看到通过上面算法,每个时间都会落到特定的窗口。
后面会验证第一个窗口的开始时间是1970-01-01 08:00:00
再举一个例子:窗口大小改为8秒。
下面是一分钟内,各个处理时间对一个的窗口
| id | 处理时间 | 窗口对应的开始时间 |
|---|---|---|
| 0 | 2021-12-20 16:57:13 | 2021-12-20 16:57:12 |
| 1 | 2021-12-20 16:57:14 | 2021-12-20 16:57:12 |
| 2 | 2021-12-20 16:57:15 | 2021-12-20 16:57:12 |
| 3 | 2021-12-20 16:57:16 | 2021-12-20 16:57:12 |
| 4 | 2021-12-20 16:57:17 | 2021-12-20 16:57:12 |
| 5 | 2021-12-20 16:57:18 | 2021-12-20 16:57:12 |
| 6 | 2021-12-20 16:57:19 | 2021-12-20 16:57:12 |
| 7 | 2021-12-20 16:57:20 | 2021-12-20 16:57:20 |
| 8 | 2021-12-20 16:57:21 | 2021-12-20 16:57:20 |
| 9 | 2021-12-20 16:57:22 | 2021-12-20 16:57:20 |
| 10 | 2021-12-20 16:57:23 | 2021-12-20 16:57:20 |
| 11 | 2021-12-20 16:57:24 | 2021-12-20 16:57:20 |
| 12 | 2021-12-20 16:57:25 | 2021-12-20 16:57:20 |
| 13 | 2021-12-20 16:57:26 | 2021-12-20 16:57:20 |
| 14 | 2021-12-20 16:57:27 | 2021-12-20 16:57:20 |
| 15 | 2021-12-20 16:57:28 | 2021-12-20 16:57:28 |
| 16 | 2021-12-20 16:57:29 | 2021-12-20 16:57:28 |
| 17 | 2021-12-20 16:57:30 | 2021-12-20 16:57:28 |
| 18 | 2021-12-20 16:57:31 | 2021-12-20 16:57:28 |
| 19 | 2021-12-20 16:57:32 | 2021-12-20 16:57:28 |
| 20 | 2021-12-20 16:57:33 | 2021-12-20 16:57:28 |
| 21 | 2021-12-20 16:57:34 | 2021-12-20 16:57:28 |
| 22 | 2021-12-20 16:57:35 | 2021-12-20 16:57:28 |
| 23 | 2021-12-20 16:57:36 | 2021-12-20 16:57:36 |
| 24 | 2021-12-20 16:57:37 | 2021-12-20 16:57:36 |
| 25 | 2021-12-20 16:57:38 | 2021-12-20 16:57:36 |
| 26 | 2021-12-20 16:57:39 | 2021-12-20 16:57:36 |
| 27 | 2021-12-20 16:57:40 | 2021-12-20 16:57:36 |
| 28 | 2021-12-20 16:57:41 | 2021-12-20 16:57:36 |
| 29 | 2021-12-20 16:57:42 | 2021-12-20 16:57:36 |
| 30 | 2021-12-20 16:57:43 | 2021-12-20 16:57:36 |
| 31 | 2021-12-20 16:57:44 | 2021-12-20 16:57:44 |
| 32 | 2021-12-20 16:57:45 | 2021-12-20 16:57:44 |
| 33 | 2021-12-20 16:57:46 | 2021-12-20 16:57:44 |
| 34 | 2021-12-20 16:57:47 | 2021-12-20 16:57:44 |
| 35 | 2021-12-20 16:57:48 | 2021-12-20 16:57:44 |
| 36 | 2021-12-20 16:57:49 | 2021-12-20 16:57:44 |
| 37 | 2021-12-20 16:57:50 | 2021-12-20 16:57:44 |
| 38 | 2021-12-20 16:57:51 | 2021-12-20 16:57:44 |
| 39 | 2021-12-20 16:57:52 | 2021-12-20 16:57:52 |
| 40 | 2021-12-20 16:57:53 | 2021-12-20 16:57:52 |
| 41 | 2021-12-20 16:57:54 | 2021-12-20 16:57:52 |
| 42 | 2021-12-20 16:57:55 | 2021-12-20 16:57:52 |
| 43 | 2021-12-20 16:57:56 | 2021-12-20 16:57:52 |
| 44 | 2021-12-20 16:57:57 | 2021-12-20 16:57:52 |
| 45 | 2021-12-20 16:57:58 | 2021-12-20 16:57:52 |
| 46 | 2021-12-20 16:57:59 | 2021-12-20 16:57:52 |
| 47 | 2021-12-20 16:58:00 | 2021-12-20 16:58:00 |
| 48 | 2021-12-20 16:58:01 | 2021-12-20 16:58:00 |
| 49 | 2021-12-20 16:58:02 | 2021-12-20 16:58:00 |
| 50 | 2021-12-20 16:58:03 | 2021-12-20 16:58:00 |
| 51 | 2021-12-20 16:58:04 | 2021-12-20 16:58:00 |
| 52 | 2021-12-20 16:58:05 | 2021-12-20 16:58:00 |
| 53 | 2021-12-20 16:58:06 | 2021-12-20 16:58:00 |
| 54 | 2021-12-20 16:58:07 | 2021-12-20 16:58:00 |
| 55 | 2021-12-20 16:58:08 | 2021-12-20 16:58:08 |
| 56 | 2021-12-20 16:58:09 | 2021-12-20 16:58:08 |
| 57 | 2021-12-20 16:58:10 | 2021-12-20 16:58:08 |
| 58 | 2021-12-20 16:58:11 | 2021-12-20 16:58:08 |
| 59 | 2021-12-20 16:58:12 | 2021-12-20 16:58:08 |
用上面的时间验证第一个窗口的开始时间。例如上面第一个时间是2021-12-20 16:57:13那么也就是1639990633000,所以(1639990633000-0)/8=204998829 (取整),然后204998829 * 8 = 1639990632000 也就是2021-12-20 16:57:12就是这个窗口的开始时间。
ok 上面0就是1970-01-01 08:00:00
我们WindowOperator就是封窗用户WindowFunction处理功能的类,下面贴下代码
@Override public void processElement(StreamRecordelement) throws Exception { final Collection elementWindows = windowAssigner.assignWindows( element.getValue(), element.getTimestamp(), windowAssignerContext); ... }
对于事件时间也是一样处理,只是不是使用当前时间计算所属的窗口,而是使用事件时间
@Override public CollectionassignWindows( Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { if (staggerOffset == null) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } // Long.MIN_VALUE is currently assigned when no timestamp is present long start = TimeWindow.getWindowStartWithOffset( timestamp, (globalOffset + staggerOffset) % size, size); return Collections.singletonList(new TimeWindow(start, start + size)); } else { throw new RuntimeException( "Record has Long.MIN_VALUE timestamp (= no timestamp marker). " + "Is the time characteristic set to 'ProcessingTime', or did you forget to call " + "'DataStream.assignTimestampsAndWatermarks(...)'?"); } }



