会话窗口按活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时,即当出现不活动间隙时,会话窗口将关闭。会话窗口可以配置有会话间隙功能,该功能定义不活动的时间长度。当此期限到期时,当前会话关闭,后续元素被分配到新的会话窗口。实现对应的例子
Flink(四) :窗口简介_在前进的路上-CSDN博客
二、例子
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
public class WindowsSessionTest {
public static void main(String[] args) {
try {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties producerProperties = new Properties();
producerProperties.setProperty("bootstrap.servers", "127.0.0.1:9092");
producerProperties.setProperty("group.id", "test-2");
producerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
producerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("test_window", new SimpleStringSchema(), producerProperties);
//Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
consumer.setStartFromGroupOffsets();
//添加kafka数据源
DataStreamSource sourceStream = env.addSource(consumer);
DataStream
说明
1、设置水位线
WatermarkStrategy
2、EventTimeSessionWindows窗口时间5秒
EventTimeSessionWindows.of(Time.seconds(5))
三、结果发送数据:
{"key":"001","time":1642263313000}
{"key":"001","time":1642263316000}
{"key":"001","time":1642263320000}
{"key":"001","time":1642263325000}
{"key":"001","time":1642263331000}
{"key":"001","time":1642263338000}
结果说明 1> (001,4,1642263313000,1642263330000) 1> (001,1,1642263331000,1642263336000)



