窗口是flink作为实时计算中的一个重要的概念,也称TimeWindow, TimeWindow是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算
flink中的窗口类型滚动窗口 Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中
DataStream> minTempPerWindowStream = dataStream .map(new MapFunction >() { @Override public Tuple2 map(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), value.getTemperature()); } }) .keyBy(data -> data.f0) .timeWindow( Time.seconds(15) ) .minBy(1);
使用TimeWindow相关API时,需要基本明确其计算模型,flink的窗口概念很像是算法中的桶的概念,即把一个时间窗口范围内的数据根据一定的业务字段分组后,归到这个“桶”中,然后基于这个“桶”中的数据做各种计算、归并等业务操作
所以在上述的模板代码中可以发现,使用timeWindow函数开窗之前,先进行keyBy操作,时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。
滑动窗口( SlidingEventTimeWindows ) 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size 下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素
DataStreamminTempPerWindowStream = dataStream .keyBy(SensorReading::getId) .timeWindow( Time.seconds(15), Time.seconds(5) ) .minBy("temperature")
时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定
下面通过一段代码来演示下滚动时间窗口效果
import com.congge.source.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class WindowEm1 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//读取kafka的数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","IP:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
//读取kafka的数据
DataStreamSource inputStream = env.addSource(
new FlinkKafkaConsumer011<>(
"zcy",
new SimpleStringSchema(),
properties)
);
DataStream dataStream = inputStream.map(new MapFunction() {
@Override
public SensorReading map(String line) throws Exception {
String[] fields = line.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
System.out.println("准备接收kafka数据......");
DataStream> minTempPerWindowStream = dataStream
.map(new MapFunction>() {
@Override
public Tuple2 map(SensorReading value) throws
Exception {
return new Tuple2<>(value.getId(), value.getTemperature());
}
})
.keyBy(data -> data.f0)
.timeWindow( Time.seconds(7) )
.minBy(1);
minTempPerWindowStream.print("minTempPerWindowStream").setParallelism(1);
env.execute();
}
}
然后开启kafka的生产者的窗口,连续不断的往 zcy 这个topic中发送消息,通过程序控制台的输出结果可以看到效果
上面的结果表示,以7秒为一个滚动时间窗口进行统计,统计7秒内进来的数据最小值
再看另外一个案例,统计7秒内进入窗口内的数据总个数
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//从环境的集合中获取数据
//String path = "E:\code-self\flink_study\src\main\resources\sensor.txt";
//DataStreamSource inputStream = env.readTextFile(path);
//读取kafka的数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","IP:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
//读取kafka的数据
DataStreamSource inputStream = env.addSource(
new FlinkKafkaConsumer011<>(
"zcy",
new SimpleStringSchema(),
properties)
);
DataStream dataStream = inputStream.map(new MapFunction() {
@Override
public SensorReading map(String line) throws Exception {
String[] fields = line.split(",");
return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
}
});
System.out.println("准备开始接收数据......");
//测试增量聚合窗口API
DataStream result = dataStream.keyBy("id")
//.countWindow(10,2)
//.window(EventTimeSessionWindows.withGap(Time.seconds(15)))
//.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
.timeWindow(Time.seconds(5))
.aggregate(new AggregateFunction() {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(SensorReading sensorReading, Integer accumator) {
return accumator + 1;
}
@Override
public Integer getResult(Integer accumator) {
return accumator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
result.print("result").setParallelism(1);
env.execute();
}
同样,运行上面的程序,观察效果
上述结果表明,在一个连续不断的接收输入结果的时间范围内,按照窗口7秒为一个统计单位的话,当时间累计到7秒时,会输出该窗口内通过 keyBy分组后的数据总量,比如 6.7 这个数据在一个时间窗口内输出了3次
上面的窗口统计也可以看成是增量聚合,即按照窗口时间的推移,增量统计出这个窗口范围的数据总量
既然有增量,必然也有全量统计,flink同样提供了全量窗口函数,请看下面的案例
//全窗口API
DataStream> result2 = dataStream.keyBy("id")
.timeWindow(Time.seconds(5))
.apply(new WindowFunction, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable iterable, Collector> collector) throws Exception {
String id = tuple.getField(0);
Long windowEnd = timeWindow.getEnd();
Integer count = IteratorUtils.toList(iterable.iterator()).size();
collector.collect(new Tuple3<>(id,windowEnd,count));
}
});
result2.print("result2").setParallelism(1);
全量聚合窗口函数只需要使用 apply这个API,在这个API的匿名函数里面,可以获取到这个统计窗口内更详细的参数信息,因此相对来说,可以做一些更为复杂的操作,比如上例统计输出该窗口内的统计结果,包括数据的 id , 最后的窗口时间,以及统计结果
运行上面的程序,观察统计结果



