窗口是处理无界流的核心。Windows将流分成大小有限的“桶”,我们可以在这些桶上进行计算。
一个 Flink窗口化程序的一般结构如下所示。第一个代码段引用键控流,而第二个代码段引用非键控流。可以看到,唯一的区别是键控流的keyBy(…)调用和非键控流的window(…)调用。
keyed Windows
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
Non-Keyed Windows
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
在上面,方括号 ([…]) 中的命令是可选的。这表明 Flink 允许您以多种不同的方式自定义窗口逻辑,使其最适合您的需求。
Windows的生命周期简而言之,当第一个应该属于该窗口的元素到达时,就会创建一个窗口,当时间(事件或处理时间)超过它的结束时间戳加上用户指定的允许延迟(参见允许延迟)时,该窗口将被完全删除。Flink保证只删除基于时间的窗口,而不删除其他类型的窗口,例如全局窗口(参见窗口分配器)。例如,使用基于事件时间的窗口策略,每 5 分钟创建一次非重叠(或翻转)窗口,并且允许延迟 1 分钟,flink会在12:00-12:05创建一个新窗口,当水印超过了12:06,那么该窗口就会被删除。
此外,每个窗口都有一个触发器(请参阅 Triggers)和一个函数(ProcessWindowFunction、ReduceFunction 或 AggregateFunction)(请参阅 Window Functions)。该函数将用于对窗口内容进行计算,而Trigger用于控制窗口函数何时开始执行计算。触发策略可能类似于“当窗口中的元素数量超过 4 时”,或“当水印通过窗口末尾时”。触发器还可以决定在创建和删除窗口之间的任何时间清除窗口内的内容。在这种情况下,只清除窗口中的元素,而不是窗口的元数据。这意味着新数据仍然可以添加到该窗口。
除了上述之外,您还可以指定一个 Evictor(参见 Evictors),它能够在触发器触发后以及应用函数之前和/或之后从窗口中删除元素。
键控与非键控窗口首先需要指定您的流是否被键控。这需要您在定义窗口之前指定。使用keyBy会将您的无限流拆分成键控流。在键控流的情况下,传入事件中的任何属性都可以用作键。键控流中允许多个并行任务对窗口中的元素进行计算,因为每个键控流的计算是相互独立的。所有指向相同键的元素将被发送到相同的并行任务中去。
在非键控流的情况下,您的原始流不会被拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行度为 1。
窗口分配器指定了键控流以后,下一步操作就是需要为这个键控流定义窗口。窗口分配器将定义元素将按照怎么样的方式分配到窗口中。这是通过在 window(...)(对于键控流)或 windowAll()(对于非键控流)调用中指定您选择的 WindowAssigner 来完成的。
WindowAssigner 负责将每个传入的元素分配给一个或多个窗口。Flink 为最常见的用例提供了预定义好的窗口分配器,即滚动窗口、滑动窗口、会话窗口和全局窗口。您还可以通过扩展 WindowAssigner 类来实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都根据时间将元素分配给窗口,时间可以是处理时间或事件时间。
基于时间的窗口有一个开始时间戳(包括)和一个结束时间戳(不包括),它们共同描述了窗口的大小。在代码中,Flink在处理基于时间的窗口时使用TimeWindow,这些窗口具有查询开始和结束时间戳的方法,以及一个额外的方法maxTimestamp(),该方法返回给定窗口所允许的最大时间戳。
滚动窗口滚动窗口分配器将每个元素分配给指定窗口大小的窗口。滚动窗口具有固定大小并且不重叠。例如,如果您指定大小为 5 分钟的滚动窗口,则每五分钟启动一个新窗口,如下图所示。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dUtIuc54-1637131278306)(https://nightlies.apache.org/flink/flink-docs-release-1.14/fig/tumbling-windows.svg)]
以下代码片段展示了如何使用滚动窗口。
DataStreaminput = ...; // tumbling event-time windows input .keyBy( ) .window(TumblingEventTimeWindows.of(Time.seconds(5))) . ( ); // tumbling processing-time windows input .keyBy( ) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) . ( ); // daily tumbling event-time windows offset by -8 hours. input .keyBy( ) .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) . ( );
如上一个示例所示,翻转窗口分配器还采用一个可选的偏移参数,可用于更改窗口的对齐方式。例如,如果没有偏移,每小时翻滚窗口与纪元对齐,即您将获得诸如 1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999 等窗口。如果你想改变它,你可以给出一个偏移量。例如,如果偏移量为 15 分钟,您将获得 1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999 等。偏移的一个重要用例是将窗口调整到时区除了UTC-0。例如,在中国,您必须指定 Time.hours(-8) 的偏移量。
滑动窗口滑动窗口分配器将元素分配给固定长度的窗口。类似于滚动窗口分配器,窗口的大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果滑动间隔小于窗口大小,滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。
例如,您可以有大小为 10 分钟的窗口,滑动为5分钟。这样,您每 5 分钟就会获得一个窗口,窗口包含过去 10 分钟内到达的事件,如下图所示。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n4UNfKyY-1637131278307)(https://nightlies.apache.org/flink/flink-docs-release-1.13/fig/sliding-windows.svg)]
以下代码片段展示了如何使用滑动窗口。
DataStreaminput = ...; // sliding event-time windows input .keyBy( ) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) . ( ); // sliding processing-time windows input .keyBy( ) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) . ( ); // sliding processing-time windows offset by -8 hours input .keyBy( ) .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) . ( );
可以使用 Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x) 指定时间间隔。
如上一个示例所示,滑动窗口分配器还采用一个可选的偏移参数,可用于更改窗口的对齐方式。例如,没有偏移,每小时滑动 30 分钟的窗口与纪元对齐,即您将获得诸如 1:00:00.000 - 1:59:59.999、1:30:00.000 - 2:29:59.999 等窗口.如果你想改变它,你可以给出一个偏移量。例如,如果偏移量为 15 分钟,您将获得 1:15:00.000 - 2:14:59.999、1:45:00.000 - 2:44:59.999 等。偏移的一个重要用例是将窗口调整到时区除了UTC-0。例如,在中国,您必须指定 Time.hours(-8) 的偏移量。
会话窗口会话窗口分配器按活动的会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时,它会关闭。会话窗口分配器可以配置为静态会话间隔或会话间隔提取器功能,该功能可以定义不活动时间的间隔。当此时间段到期时,当前会话将关闭,后续元素将分配给新的会话窗口
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vrwuOajm-1637131278308)(https://nightlies.apache.org/flink/flink-docs-release-1.13/fig/session-windows.svg)]
以下代码片段展示了如何使用会话窗口。
DataStreaminput = ...; // event-time session windows with static gap input .keyBy( ) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) . ( ); // event-time session windows with dynamic gap input .keyBy( ) .window(EventTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) . ( ); // processing-time session windows with static gap input .keyBy( ) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10))) . ( ); // processing-time session windows with dynamic gap input .keyBy( ) .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> { // determine and return session gap })) . ( );
可以使用 Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x) 来指定静态间隙。
动态间隔是通过实现 SessionWindowTimeGapExtractor 接口来指定的。
由于会话窗口没有固定的开始和结束,因此它们的计算方式与滚动窗口和滑动窗口不同。在内部,会话窗口算子为每个到达的记录创建一个新窗口,如果它们彼此之间的距离比定义的间隙更近,则将它们合并在一起。为了可合并,会话窗口算子需要合并触发器和合并窗口函数,例如 ReduceFunction、AggregateFunction 或 ProcessWindowFunction
全局窗口全局窗口分配器将具有相同键的所有元素分配给同一个全局窗口。此窗口仅在指定自定义触发器时才会生效。否则,不会执行任何计算,因为全局窗口结束点。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b6G33JQn-1637131278308)(https://nightlies.apache.org/flink/flink-docs-release-1.13/fig/non-windowed.svg)]
以下代码片段展示了如何使用会话窗口。
DataStream窗口函数input = ...; input .keyBy( ) .window(GlobalWindows.create()) . ( );
在定义了窗口分配器之后,我们需要指定我们想要在这些窗口中的每一个元素上执行的计算逻辑。
窗口函数可以是 ReduceFunction、AggregateFunction 或 ProcessWindowFunction 之一。前两个性能更高,因为 Flink 可以在每个窗口的元素到达时增量聚合它们。 ProcessWindowFunction 获取窗口中的所有元素的 Iterable 以及有关元素所属窗口的附加元信息。
ReduceFunctionReduceFunction 指定如何聚合来自输入的两个元素来生成相同类型的输出元素。 Flink 使用 ReduceFunction 来增量聚合窗口的元素。
可以像这样定义和使用 ReduceFunction:
DataStreamAggregateFunction> input = ...; input .keyBy( ) .window( ) .reduce(new ReduceFunction >() { public Tuple2 reduce(Tuple2 v1, Tuple2 v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } });
AggregationFunction是ReduceFunction的一般版本,它具有三个类型:输入类型(IN)、累加器(ACC)和输出类型(OUT)。输入类型是输入流中元素的类型,AggregationFunction可以将流中的元素添加到累加器中。该方法还具备初始累加器、将两个累加器合并为一个累加器以及从累加器中提取输出的方法。
与ReduceFunction一样,Flink将增量聚合进入窗口中的输入元素。
private static class AverageAggregate
implements AggregateFunction, Tuple2, Double> {
@Override
public Tuple2 createAccumulator() {
return new Tuple2<>(0L, 0L);
}
@Override
public Tuple2 add(Tuple2 value, Tuple2 accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}
@Override
public Double getResult(Tuple2 accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}
@Override
public Tuple2 merge(Tuple2 a, Tuple2 b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}
DataStream> input = ...;
input
.keyBy()
.window()
.aggregate(new AverageAggregate());
ProcessWindowFunction
ProcessWindowFunction能够获取窗口中所有元素的迭代器,以及一个可以访问时间和状态信息的Context对象,这使得它比其他窗口更加具有灵活性。这是以性能和资源消耗为代价的,因为不能增量聚合,而是所有数据放在内存中,知道触发窗口计算才进行聚合。
public abstract class ProcessWindowFunction增量聚合的ProcessWindowFunctionimplements Function { public abstract void process( KEY key, Context context, Iterable elements, Collector out) throws Exception; public abstract class Context implements java.io.Serializable { public abstract W window(); public abstract long currentProcessingTime(); public abstract long currentWatermark(); public abstract KeyedStateStore windowState(); public abstract KeyedStateStore globalState(); } }
ProcessWindowFunction可以与ReduceFunction和AggregationFunction组合使用,以实现达到增量聚合效果。当窗口关闭时ProcessWindowFunction提供聚合结果。
你也可以使用WindowFunction代替ProcessWindowFunction来进行窗口的增加计算。
使用ReduceFunction进行窗口增量计算下面将展示如何将ReduceFunction和ProcessWindowFunction组合起来,以返回窗口中的最小值以及窗口的开始时间。
DataStream使用AggregationFunction进行窗口增量计算input = ...; input .keyBy( ) .window( ) .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions private static class MyReduceFunction implements ReduceFunction { public SensorReading reduce(SensorReading r1, SensorReading r2) { return r1.value() > r2.value() ? r2 : r1; } } private static class MyProcessWindowFunction extends ProcessWindowFunction , String, TimeWindow> { public void process(String key, Context context, Iterable minReadings, Collector > out) { SensorReading min = minReadings.iterator().next(); out.collect(new Tuple2 (context.window().getStart(), min)); } }
下面将展示如何将AggregationFunction和ProcessWindowFunction组合起来,以计算平均值,并且将键以及平均值输出出来。
DataStream触发器> input = ...; input .keyBy( ) .window( ) .aggregate(new AverageAggregate(), new MyProcessWindowFunction()); // Function definitions private static class AverageAggregate implements AggregateFunction , Tuple2 , Double> { @Override public Tuple2 createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2 add(Tuple2 value, Tuple2 accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2 accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2 merge(Tuple2 a, Tuple2 b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } private static class MyProcessWindowFunction extends ProcessWindowFunction , String, TimeWindow> { public void process(String key, Context context, Iterable averages, Collector > out) { Double average = averages.iterator().next(); out.collect(new Tuple2<>(key, average)); } }
触发器决定何时触发窗口计算。每个WindwoAssigner都有一个默认的触发器,如果不满足您的要求,你可以使用trigger(...)自定义一个触发器。
触发器有接口有5方法,允许触发器对不同的事件作出反应:
- 对于添加到窗口中的没个元素,都会触发onElement()方法
- 当注册的事件时间计时器触发时,会触发onEventTime()方法
- 当注册的处理时间计时器触发时,会触发onProcessiongTime()方法
- onMerge()方法与有状态的触发器有关,并在两个触发器的响应窗口合并时,合并两个触发器的状态。
- clear()方法用于在删除窗口时所需执行的任何操作。
对于上述方法有两点需要注意:
- 前三种方法通过返回一个TriggerResult来决定如何处理它们的调用事件。动作可以是以下其中之一:
- CONTINUE:什么也不做,
- FIRE:触发计算,
- PURGE:清除窗口中的元素
- FIRE_AND_PURGE:触发计算,然后清除窗口中的元素。
- 这些方法中的任何一个都可以用来为将来的动作注册处理或事件时间计时器。
一旦触发器确定窗口已准备好进行处理,他就会触发,即返回FIRE或者FIRE_AND_PURGE。给定一个带有ProcessWindowFunction的窗口,那么所有的元素都会传递给ProcessWindowFunction带有ReduceFunction或者AggregationFunction的窗口只会发出他们期望的聚合结果。
当触发器触发时,它可以是FIRE或FIRE_AND_PURGE。FIRE保留窗口的内容,FIRE_AND_PURGE则删除窗口中的内容。默认情况下,内置的触发器只是FIRE,而不清除窗口状态。
WindowAgssigner的默认触发器WindowAssigner的默认触发器适用于许多用例。例如,所有事件时间窗口分配器都有一个EventTimeTrigger作为默认触发器。一旦水印通过窗口的末端,这个触发器就会触发。
GlobalWindow的默认触发器是NeverTrigger,它从不触发。因此,在使用GlobalWindow时,你总是需要定义一个自定义触发器。
通过使用trigger()指定触发器,您将覆盖WindowAssigner的默认触发器。例如,如果你为TumblingEventTimeWindows指定一个CountTrigger,你将不再根据时间的进展而只根据计数来触发窗口。现在,如果您想同时基于时间和计数做出反应,就必须编写自己的自定义触发器。
内置和自定义触发器flink自带了几个内置触发器
- (已经提到)EventTimeTrigger根据水印测量的事件时间进度触发。
- ProcessingTimeRigger根据处理时间触发。
- 当窗口中的元素数超过给定的限制时,CountTrigger将触发。
- PurgingTrigger将另一个触发器作为参数,并将其转换为清除触发器。
Flink的窗口模型允许在指定WindowAssigner和Trigger之外指定一个可选的驱逐器。这可以通过使用evictor(…)方法来完成(见本文开头)。驱逐器能够在触发器触发后以及在应用窗口函数之前和/或之后从窗口中删除元素。要做到这一点,evtor接口有两个方法:
void evictBefore(Iterable> elements, int size, W window, EvictorContext evictorContext); void evictAfter(Iterable > elements, int size, W window, EvictorContext evictorContext);
evectBefore()在窗口函数之前应用的驱逐逻辑,而evectAfter()在窗口函数之后应用的驱逐逻辑。在window函数的应用程序之前被驱逐的元素将不会被window函数处理。
Flink内置了三个预实现的驱逐器。这些都是:
- CountEvictor:从窗口中保留用户指定数量的元素,并从窗口缓冲区的开头丢弃剩余的元素。
- DeltaEvictor:采用 DeltaFunction 和阈值,计算窗口缓冲区中的最后一个元素与其余每个元素之间的差值,并删除差值大于或等于阈值的元素。
- TimeEvictor:以毫秒为单位的interval作为参数,对于给定的窗口,它在其元素中找到最大时间戳 max_ts 并删除所有时间戳小于 max_ts - interval的元素。
当使用事件时间窗口时,可能会发生元素延迟到达的情况,即Flink用来跟踪事件时间进展的水印已经超过了元素所属窗口的结束时间戳。
默认情况下,当水印超过窗口的末尾时,将删除后期元素。但是,Flink允许指定窗口操作符的最大允许延迟时间。允许延迟指定元素在被删除之前可以延迟多少时间,其默认值为0。水印经过窗口结束之后、但在经过窗口结束之前加上允许的延迟时间之前到达的元素仍然被添加到窗口。根据所使用的触发器,迟来但未删除的元素可能会导致窗口再次触发。这就是EventTimeTrigger的情况。
为了使其工作,Flink将保持窗口的状态,直到允许的延迟到期。一旦发生这种情况,Flink就会删除窗口并删除其状态,这在window Lifecycle部分中也有描述。
缺省情况下,允许的延迟时间为0。也就是说,到达水印后面的元素将被删除。
你可以像这样指定一个允许的迟到:
DataStream将延迟数据作为测输出input = ...; input .keyBy( ) .window( ) .allowedLateness(
使用Flink的侧输出特性,您可以获得一个延迟导致丢弃的数据流。
首先需要在窗口流上使用sideOutputLateData(OutputTag)获取延迟数据。然后,你可以在窗口操作的结果上获得侧输出流:
final OutputTag延迟元素考虑lateOutputTag = new OutputTag ("late-data"){}; DataStream input = ...; SingleOutputStreamOperator result = input .keyBy( ) .window( ) .allowedLateness(
当指定大于 0 的允许延迟时,在水印通过窗口末尾后保留窗口及其内容。在这些情况下,当一个迟到但没有被丢弃的元素到达时,它可能会触发窗口的另一次触发。这些触发被称为延迟触发,因为它们是由延迟事件触发的,与主触发相反,主触发是窗口的第一次触发。在会话窗口的情况下,延迟触发可能会进一步导致窗口合并,因为它们可能会“弥合”两个预先存在的未合并窗口之间的差距。
延迟触发发出的元素应视为先前计算的更新结果,即您的数据流将包含同一计算的多个结果。根据您的应用程序,您需要考虑这些重复的结果或对它们进行重复数据删除。
处理窗口结果窗口化操作的结果又是一个数据流,结果元素中没有保留有关窗口化操作的信息,因此如果您想保留有关窗口的元信息,您必须在 ProcessWindowFunction 的结果元素中手动编码该信息。在结果元素上设置的唯一相关信息是元素时间戳。这被设置为处理窗口的最大允许时间戳,即结束时间戳 - 1,因为窗口结束时间戳是独占的。请注意,对于事件时间窗口和处理时间窗口都是如此。即在窗口化操作之后元素总是有一个时间戳,但这可以是事件时间时间戳或处理时间时间戳。对于处理时间窗口,这没有特殊含义,但对于事件时间窗口,这与水印如何与窗口交互一起使具有相同窗口大小的连续窗口操作成为可能。我们将在了解水印如何与窗口交互后介绍这一点。
水印和窗口的交互当水印到达窗口时,这会触发两件事:
- 水印触发最大时间戳(即结束时间戳 - 1)小于新水印的所有窗口的计算
- 水印被转发(按原样)到下游操作
如前所述,计算窗口结果的时间戳的方式以及水印如何与窗口交互允许将连续的窗口操作串在一起。当您想要执行两个连续的窗口化操作时,您想要使用不同的键,但仍希望来自同一个上游窗口的元素最终出现在同一个下游窗口中时,这会很有用。考虑这个例子:
DataStreaminput = ...; DataStream resultsPerKey = input .keyBy( ) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .reduce(new Summer()); DataStream globalResults = resultsPerKey .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new TopKWindowFunction());
在这个例子中,第一个操作的时间窗口 [0, 5) 的结果也将在随后的窗口操作中结束在时间窗口 [0, 5) 中。这允许计算每个键的总和,然后在第二个操作中计算同一窗口内的前 k 个元素
有用的状态大小考虑可以在很长一段时间内(如天、周或月)定义窗口,因此会累积非常大的状态。在估算窗口计算的存储需求时,有几个规则需要记住:
-
Flink为每个元素所属的窗口创建一个副本。因此,滚动窗口会保留每个元素的一个副本(一个元素只属于一个窗口,除非它被延迟删除)。相反,滑动窗口会创建每个元素的多个元素,如Window Assigners部分所述。因此,一个1天的滑动窗口和1秒的滑动窗口可能不是一个好主意。
-
ReduceFunction和AggregateFunction可以显著减少存储需求,因为它们会急切地聚合元素,并且每个窗口只存储一个值。相反,仅仅使用ProcessWindowFunction就需要累积所有元素。
-
使用Evictor可以防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须通过Evictor(请参阅驱逐器)。



