栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink 常用窗口函数

flink 常用窗口函数

前言

窗口是flink作为实时计算中的一个重要的概念,也称TimeWindow, TimeWindow是将指定时间范围内的所有数据组成一个 window,一次对一个window 里面的所有数据进行计算

flink中的窗口类型

滚动窗口 Flink 默认的时间窗口根据 Processing Time 进行窗口的划分,将 Flink 获取到的数据根据进入 Flink 的时间划分到不同的窗口中

DataStream> minTempPerWindowStream = dataStream
 .map(new MapFunction>() {
 @Override
 public Tuple2 map(SensorReading value) throws 
Exception {
 return new Tuple2<>(value.getId(), value.getTemperature());
 }
 })
 .keyBy(data -> data.f0) 
 .timeWindow( Time.seconds(15) )
 .minBy(1);

使用TimeWindow相关API时,需要基本明确其计算模型,flink的窗口概念很像是算法中的桶的概念,即把一个时间窗口范围内的数据根据一定的业务字段分组后,归到这个“桶”中,然后基于这个“桶”中的数据做各种计算、归并等业务操作

所以在上述的模板代码中可以发现,使用timeWindow函数开窗之前,先进行keyBy操作,时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。

滑动窗口( SlidingEventTimeWindows ) 滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size 下面代码中的 sliding_size 设置为了 5s,也就是说,每 5s 就计算输出结果一次,每一次计算的 window 范围是 15s 内的所有元素

DataStream minTempPerWindowStream = dataStream
 .keyBy(SensorReading::getId) 
 .timeWindow( Time.seconds(15), Time.seconds(5) )
 .minBy("temperature")

时间间隔可以通过 Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定

下面通过一段代码来演示下滚动时间窗口效果

import com.congge.source.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class WindowEm1 {

    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //读取kafka的数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","IP:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        //读取kafka的数据
        DataStreamSource inputStream = env.addSource(
                new FlinkKafkaConsumer011<>(
                        "zcy",
                        new SimpleStringSchema(),
                        properties)
        );

        DataStream dataStream = inputStream.map(new MapFunction() {
            @Override
            public SensorReading map(String line) throws Exception {
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });

        System.out.println("准备接收kafka数据......");
        DataStream> minTempPerWindowStream = dataStream
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(SensorReading value) throws
                            Exception {
                        return new Tuple2<>(value.getId(), value.getTemperature());
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow( Time.seconds(7) )
                .minBy(1);

        minTempPerWindowStream.print("minTempPerWindowStream").setParallelism(1);

        env.execute();
    }

}

然后开启kafka的生产者的窗口,连续不断的往 zcy 这个topic中发送消息,通过程序控制台的输出结果可以看到效果

 上面的结果表示,以7秒为一个滚动时间窗口进行统计,统计7秒内进来的数据最小值

再看另外一个案例,统计7秒内进入窗口内的数据总个数

public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //从环境的集合中获取数据
        //String path = "E:\code-self\flink_study\src\main\resources\sensor.txt";
        //DataStreamSource inputStream = env.readTextFile(path);

        //读取kafka的数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","IP:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        //读取kafka的数据
        DataStreamSource inputStream = env.addSource(
                new FlinkKafkaConsumer011<>(
                        "zcy",
                        new SimpleStringSchema(),
                        properties)
        );

        DataStream dataStream = inputStream.map(new MapFunction() {
            @Override
            public SensorReading map(String line) throws Exception {
                String[] fields = line.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });
        System.out.println("准备开始接收数据......");
        //测试增量聚合窗口API
        DataStream result = dataStream.keyBy("id")
                //.countWindow(10,2)
                //.window(EventTimeSessionWindows.withGap(Time.seconds(15)))
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(15)));
                .timeWindow(Time.seconds(5))
                .aggregate(new AggregateFunction() {
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(SensorReading sensorReading, Integer accumator) {
                        return accumator + 1;
                    }

                    @Override
                    public Integer getResult(Integer accumator) {
                        return accumator;
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                });

        result.print("result").setParallelism(1);


        env.execute();
    }

同样,运行上面的程序,观察效果

 

上述结果表明,在一个连续不断的接收输入结果的时间范围内,按照窗口7秒为一个统计单位的话,当时间累计到7秒时,会输出该窗口内通过 keyBy分组后的数据总量,比如 6.7 这个数据在一个时间窗口内输出了3次

上面的窗口统计也可以看成是增量聚合,即按照窗口时间的推移,增量统计出这个窗口范围的数据总量

既然有增量,必然也有全量统计,flink同样提供了全量窗口函数,请看下面的案例

//全窗口API
        DataStream> result2 = dataStream.keyBy("id")
                .timeWindow(Time.seconds(5))
                .apply(new WindowFunction, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable iterable, Collector> collector) throws Exception {
                        String id = tuple.getField(0);
                        Long windowEnd = timeWindow.getEnd();
                        Integer count = IteratorUtils.toList(iterable.iterator()).size();
                        collector.collect(new Tuple3<>(id,windowEnd,count));

                    }
                });
        result2.print("result2").setParallelism(1);

全量聚合窗口函数只需要使用 apply这个API,在这个API的匿名函数里面,可以获取到这个统计窗口内更详细的参数信息,因此相对来说,可以做一些更为复杂的操作,比如上例统计输出该窗口内的统计结果,包括数据的 id , 最后的窗口时间,以及统计结果

运行上面的程序,观察统计结果

 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710153.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号