滚动窗口将每个元素分配给指定窗口大小的窗口。滚动窗口具有固定大小并且不重叠。例如,如果指定一个大小为 5 分钟的滚动窗口,则将评估当前窗口,并每隔五分钟启动一个新窗口,我们实现对应的例子
Flink(四) :窗口简介_在前进的路上-CSDN博客
二、代码
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.time.Duration;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class WindowsTumblingWindowsTest {
public static void main(String[] args) {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
producerProperties.setProperty("group.id", "test");
producerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
producerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("test_window", new SimpleStringSchema(), producerProperties);
//Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
consumer.setStartFromGroupOffsets();
//添加数据源
DataStreamSource sourceStream = env.addSource(consumer);
//指定事件时间字段
DataStream
说明
1、设置水位线
WatermarkStrategy
2、TumblingEventTimeWindows窗口时间
TumblingEventTimeWindows.of(Time.seconds(5)
三、执行结果发送数据
{"key":"002","time":1642263310000}
{"key":"002","time":1642263311000}
{"key":"002","time":1642263313000}
{"key":"002","time":1642263315000}
{"key":"002","time":1642263316000}
{"key":"002","time":1642263317000}
{"key":"002","time":1642263318000}
{"key":"002","time":1642263319000}
{"key":"002","time":1642263320000}
窗口统计的执行结果
7> (002,3,1642263310000,1642263315000)
7> (002,5,1642263315000,1642263320000)



