在流处理应用中,数据是连续不断的,因此我们不可能等到所有数据都到了才开始处理。当然我们可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的一分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口,用来收集最近一分钟内的数据,并对这个窗口内的数据进行计算。
流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 Window 窗口是一种切割无限数据为有限块进行处理的手段。
在 Flink 中, 窗口(window)是处理无界流的核心。窗口把流切割成有限大小的多个"存储 桶"(bucket), 我们在这些桶上进行计算。
Flink 的窗口分为三类:基于时间的窗口、基于元素个数的窗口、全局窗口
一、基于时间的窗口时间窗口(仅考虑 processTime,这里不考虑 eventTime),维护着一个个左闭右开的开始/结束时间戳,这两个时间戳共同限制着窗口的大小,在达到结束时间戳时窗口关闭输出数据。同时基于时间的窗口又分为三类:滚动窗口、滑动窗口、会话窗口。
1.1 滚动窗口滚动窗口有固定的大小, 窗口与窗口之间不会重叠也没有缝隙。比如,如果指定一个长度为 5 分钟的滚动窗口, 当前窗口开始计算, 每 5 分钟启动一个新的窗口。滚动窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
案例:每隔五秒统计一次这五秒内单词个数
代码如下:
package day05;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class Flink01_Window_TimeTumbling {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据并处理
env.socketTextStream("localhost", 1111)
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(x -> x.f0)
// 开窗,创建一个 5s 的滚动窗口
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
.print();
// 执行任务
env.execute();
}
}
结果不好截图,可以掐表在 5s 内输入一批数据。
问题一:窗口的开始时间是什么?
任务启动输入第一条数据的时候,有时很快就出结果,有时却要等待近乎 5s 的时间。这是因为窗口的开始时间并不是第一条数据的时间,而是根据窗口大小进行一个归整,如 5s 的滚动窗口第一条数据是 09:53,那这条数据一定属于 09:50-09:55 这一个窗口内。
下面是滚动窗口分配窗口的源码:
@Override public CollectionassignWindows(Object element, long timestamp, WindowAssignerContext context) { final long now = context.getCurrentProcessingTime(); if (staggerOffset == null) { staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size); } long start = TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size); return Collections.singletonList(new TimeWindow(start, start + size)); }
首先获取当前时间,通过 getWindowStartWithOffset 计算开始时间,最后创建一个窗口。计算开始时间如下:
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
offset 为偏移量默认为 0,这里先忽略,其中 timestamp 为当前的时间戳,windowSize 为窗口大小,那么这个方法最终就是 timestamp - timestamp % windowSize,其结果就是抹去余数,如窗口大小为 5,那么开始时间戳一定是 5 的倍数。加上 windowSize 本身主要是为了考虑时间错为 0 即 1970年这个问题,当 Flink 的时间设定为事件时间时可能会存在时间戳为负数的情况。
问题二:offset 作用?
Flink 默认时间为 0 时区时间,当我们要按天开窗统计数据时,理论上是 00:00 - 00:00,因为我们在东八区实际上统计的范围是 08:00-08:00,这个时候我们就需要给一个8小时的偏移量,其受影响的还是 getWindowStartWithOffset 方法。offset 通常不指定,纯属恶心自己本来整整齐齐的窗口非得往前推 1s。
问题三:窗口为什么是左闭右开?
在 TimeWindow 类中
@Override
public long maxTimestamp() {
return end - 1;
}
结束时间会被 -1,注意这个 1 是毫秒,因此我们算出的窗口结束时间假如是09:55,实际上是09:54.999,09:55 这条数据属于下一个窗口。
1.2 滑动窗口与滚动窗口一样, 滑动窗口也是有固定的长度。另外一个参数我们叫滑动步长, 用来控制滑动窗口启动的频率。所以, 如果滑动步长小于窗口长度, 滑动窗口会重叠。这种情况下, 一个元素可能会被分配到多个窗口中。例如, 滑动窗口长度 10 分钟, 滑动步长 5 分钟, 则, 每 5 分钟会得到一个包含最近 10 分钟的数据。当滑动步长等于窗口大小时它就变成了一个滚动窗口;当滑动步长大于窗口大小?这个…最好不要这么做,会丢失一部分数据的。
代码如下:
package day05;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class Flink02_Window_TimeSliding {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据并处理
env.socketTextStream("localhost", 1111)
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(x -> x.f0)
// 开窗
.window(SlidingProcessingTimeWindows.of(Time.seconds(6L), Time.seconds(3L)))
.sum(1)
.print();
// 执行任务
env.execute();
}
}
像这种窗口大小为 6s,滑动步长为 3s,一条数据会被输出几次呢?答案是 2 次。滑动窗口的窗口分配逻辑如下:
@Override public CollectionassignWindows(Object element, long timestamp, WindowAssignerContext context) { timestamp = context.getCurrentProcessingTime(); List windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; }
其逻辑是首先和滑动窗口一样算出当前数据属于的窗口范围作为最后一个窗口,然后向前拓展一个窗口大小的时间范围,以滑动步长为"窗口大小",构建一系列的窗口。听不懂?假设当前时间戳为 53,窗口大小 6s,滑动步长为 3s,偏移量为 0,最终 lastStart 为 53 - (53 - 0 + 3) % 3 = 51,下面进出循环
| start | timestamp - size | slide | window range |
|---|---|---|---|
| 51 | 47 | 3 | [51,57) |
| 48 | 47 | 3 | [48,54) |
| 45 | 47 | 3 | - |
因此滑动窗口从 48s 开始,大小为 6s,每 2s 滑动一次,53s 的数据会被输出 2 次,若窗口大小不能整除滑动步长,数据输出个数可能是 ceil(窗口大小/滑动步长) 或 ceil(窗口大小/滑动步长) + 1
1.3 会话窗口会话窗口分配器会根据活动的元素进行分组。会话窗口不会有重叠, 与滚动窗口和滑动窗口相比, 会话窗口也没有固定的开启和关闭时间。如果会话窗口有一段时间没有收到数据, 会话窗口会自动关闭, 这段没有收到数据的时间就是会话窗口的 gap(间隔)。
我们可以配置静态的 gap, 也可以通过一个 gap extractor 函数来定义 gap 的长度. 当时间超过了这个 gap, 当前的会话窗口就会关闭, 后序的元素会被分配到一个新的会话窗口。
只要在 5s 内输入了两条及以上的数据这个窗口就永远关不上,代码如下:
package day05;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class Flink03_Window_TimeSession {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据并处理
env.socketTextStream("localhost", 1111)
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(x -> x.f0)
// 开窗
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5L)))
.sum(1)
.print();
// 执行任务
env.execute();
}
}
在特定的业务场景可能有奇效,有时间可以研究一下内部逻辑
二、基于元素个数的窗口按照指定的数据条数生成一个 Window,与时间无关
2.1 滑动窗口默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到 窗口大小时,就会触发窗口的执行。
代码如下:
package day05;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class Flink04_Window_Count {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据并处理
env.socketTextStream("localhost", 1111)
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(x -> x.f0)
// 开窗
.countWindow(5)
.sum(1)
.print();
// 执行任务
env.execute();
}
}
每五条数据输出一次,哪个窗口先达到 3 个元素, 哪个窗口就关闭。不影响其他的窗口。
2.2 滚动窗口滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围最多是 5 个元素。
package day05;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class Flink04_Window_Count {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据并处理
env.socketTextStream("localhost", 1111)
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(x -> x.f0)
// 开窗
.countWindow(5,2)
.sum(1)
.print();
// 执行任务
env.execute();
}
}
三、全局窗口
全局窗口分配器会分配相同 key 的所有元素进入同一个 Global window。这种窗口机制只有指定自定义的触发器时才有用。否则, 不会做任务计算, 因为这种窗口没有能够处理聚集在一起元素的结束点。基于元素个数的窗口本质就是全局窗口,只是定一个触发器。上面说的窗口都是基于 KeyedStream,但 DataStream 也有一个 windowAll 算子,它们的区别在于:globalWindow 将相同的 key 放在一个窗口中,windowAll 将所有数据放在一个窗口中。
四、窗口函数前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由 window function 来负责。一旦窗口关闭, window function 去计算处理窗口中的每个元素。
window function 可以是 ReduceFunction,AggregateFunction,or ProcessWindowFunction 中的任意一种。
ReduceFunction,AggregateFunction 更加高效, 原因就是 Flink 可以对到来的元素进行增量聚合。
ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息。
ProcessWindowFunction 不能被高效执行的原因是 Flink 在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素,但是在计算诸如平均值就不得不用这个。
4.1 增量聚合函数如聚合函数:sum、max、reduce这些都是来一条计算一条,但是缺点是sum、max只能指定字段不够灵活,reduce可以自定义聚合逻辑但是无法改变前后数据类型,即输入是什么类型输出就一定是这个类型,而 AggregateFunction 可以很好的做到。
package org.apache.flink.api.common.functions; import org.apache.flink.annotation.PublicEvolving; import java.io.Serializable; @PublicEvolving public interface AggregateFunctionextends Function, Serializable { ACC createAccumulator(); ACC add(IN value, ACC accumulator); OUT getResult(ACC accumulator); ACC merge(ACC a, ACC b); }
其中 IN:输入数据类型;OUT:输出数据类型;ACC:累加器数据类型
使用 AggregateFunction 实现 sum
考虑到 IN,OUT 为 Tuple2
private static class MyAggregate implements AggregateFunction, Integer, Tuple2 > { @Override public Integer createAccumulator() { return 0; } @Override public Integer add(Tuple2 value, Integer accumulator) { return value.f1 + accumulator; } @Override public Tuple2 getResult(Integer accumulator) { return null; } @Override public Integer merge(Integer a, Integer b) { return a + b; } }
但是发现 getResult 这个方法写不下去,因为获取不到 key 值,这也是增量聚合函数的弊端之一,效率高但无法获取窗口信息,但是 Flink 针对这点也做了优化,既然全窗口函数可以获取窗口信息,那么增量聚合函数将聚合好的数据发送给全窗口函数就好了,全窗口函数之所以效率低是因为它要收集整个窗口的数据后再做计算,那我用增量聚合函数计算好发给你,其相当于你的窗口就一个元素,效率自然就高了。
package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.Function; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import java.io.Serializable; @Public public interface WindowFunctionextends Function, Serializable { void apply(KEY key, W window, Iterable input, Collector out) throws Exception; }
代码如下:
package day05;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
public class Flink05_Window_Aggregate {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据并处理
env.socketTextStream("localhost", 1111)
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(x -> x.f0)
// 开窗
.window(TumblingProcessingTimeWindows.of(Time.seconds(6L)))
.aggregate(new MyAggregate(), new MyWindowFunction())
.print();
// 执行任务
env.execute();
}
private static class MyAggregate implements AggregateFunction, Integer, Integer> {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Tuple2 value, Integer accumulator) {
return accumulator + value.f1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
}
private static class MyWindowFunction implements WindowFunction, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable input, Collector> out) throws Exception {
System.out.println("当前窗口的 key:" + key);
System.out.println("当前窗口是:[ " + new Date(window.getStart()) + ", " + new Date(window.getEnd()) + " )");
// input 迭代器一定是只有一个元素
out.collect(Tuple2.of(key, input.iterator().next()));
}
}
}
注:AggregateFunction 输出对应 WindowFunction 的输入,WindowFunction 的输出为业务需求的输出。
4.2 全窗口函数窗口存在期间收集数据到一个集合,窗口关闭触发计算,常见的有 apple(WindowFunction) 和 process(ProcessWindowFunction),二者区别在于 process 有更丰富的生命周期方法和可以获取运行时上下文
代码如下:
package day05;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
public class Flink06_Window_AllWindow {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据并处理
env.socketTextStream("localhost", 1111)
.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1));
}
}
}).returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(x -> x.f0)
// 开窗
.window(TumblingProcessingTimeWindows.of(Time.seconds(6L)))
.process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction, Tuple2, String, TimeWindow>.Context context, Iterable> elements, Collector> out) throws Exception {
int result = 0;
for (Tuple2 element : elements) {
result += element.f1;
}
out.collect(Tuple2.of(key, result));
}
})
.print();
// 执行任务
env.execute();
}
}
从窗口函数实现逻辑来看,全窗口函数是将数据收集到迭代器中,窗口关闭遍历元素处理数据。



