栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink——窗口(window)

Flink——窗口(window)

一:窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析 window类型: 时间窗口(Time Window) ➢ 滚动时间窗口( Tumbling Windows):将数据依据固定的窗口长度对数据进行切分时间对齐,窗口长度固定,没有重叠 ➢ 滑动时间窗口 ( Sliding Windows):滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。窗口长度固定,可以有重叠 ➢ 会话窗口(Session Windows):由一系列事件组合一个指定时间长度的 timeout 间隙组成,也就是 一段时间没有接收到新数据就会生成新的窗口。时间无对齐 计数窗口(Count Window) ➢ 滚动计数窗口 ➢ 滑动计数窗口 二:窗口函数(window function) • window function 定义了要对窗口中收集的数据做的计算操作,可以分为两类 ➢ 增量聚合函数(incremental aggregation functions) • 每条数据到来就进行计算,保持一个简单的状态 • ReduceFunction, AggregateFunction ➢ 全窗口函数(full window functions) • 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据 • ProcessWindowFunction,WindowFunction 案例

数据:

sensor_1,1547718199,35.8
sensor_1,1547718199,23.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_7,1547718202,3.7
sensor_10,1547718205,38.1
sensor_10,1543228205,32.1
sensor_10,1147248205,40.1

1:时间滚动窗口,聚合函数,全窗口函数

package com.atguigu.window;

import com.atguigu.bean.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IterableUtils;

public class TimeWindow01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource socketTextStream = env.socketTextStream("hadoop112", 7777);

        DataStream inputStream = socketTextStream.map(value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });
        
        SingleOutputStreamOperator res = inputStream.keyBy("id")
//                .countWindow(10,2);  //参数:窗口大小,时间步长
//                .window(EventTimeSessionWindows.withGap(Time.minutes(1)));
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
                .timeWindow(Time.seconds(10)) //参数:窗口大小,时间步长
                .aggregate(new AggregateFunction() {

                    @Override
                    public Integer createAccumulator() {  //创建一个累加器,初始值
                        return 0;
                    }

                    @Override
                    public Integer add(SensorReading value, Integer accumulator) {  //累加规则
                        return accumulator + 1;
                    }

                    @Override
                    public Integer getResult(Integer accumulator) {  //结果
                        return accumulator;
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {  //合并
                        return a + b;
                    }
                });


        
        SingleOutputStreamOperator> apply = inputStream.keyBy("id")
                .timeWindow(Time.seconds(10))
                .apply(new WindowFunction, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable input, Collector> out) throws Exception {
                        String id = tuple.getField(0);
                        Long windowEndTime =window.getEnd();
                        int size = IteratorUtils.toList(input.iterator()).size();
                        out.collect(new Tuple3<>(id,windowEndTime,size));
                    }
                });

        apply.print();


        env.execute();
    }
}

1:计数滚动窗口,聚合函数

package com.atguigu.window;

import com.atguigu.bean.SensorReading;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

public class CountWindow01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource socketTextStream = env.socketTextStream("hadoop112", 7777);

        DataStream inputStream = socketTextStream.map(value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        });
        
        SingleOutputStreamOperator aggregate = inputStream.keyBy("id")
                .countWindow(5, 2)
                .aggregate(new AggregateFunction, Double>() {
                    @Override
                    public Tuple2 createAccumulator() {
                        return new Tuple2<>(0.0, 0);
                    }

                    @Override
                    public Tuple2 add(SensorReading value, Tuple2 accumulator) {
                        return new Tuple2<>(accumulator.f0 + value.getTemperature(), accumulator.f1 + 1);
                    }

                    @Override
                    public Double getResult(Tuple2 accumulator) {
                        return accumulator.f0 / accumulator.f1;
                    }

                    @Override
                    public Tuple2 merge(Tuple2 a, Tuple2 b) {
                        return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
                    }
                });
        aggregate.print();


        env.execute();
    }
}
其它可选 API • .trigger() —— 触发器 ➢ 定义 window 什么时候关闭,触发计算并输出结果 • .evictor() —— 移除器 ➢ 定义移除某些数据的逻辑 • .allowedLateness() —— 允许处理迟到的数据,到时间先输出聚合结果,然后迟到数据来一个计算一次 • .sideOutputLateData() —— 将迟到的数据放入侧输出流 • .getSideOutput() —— 获取侧输出流
        SingleOutputStreamOperator sumStream = inputStream.keyBy("id")
                .timeWindow(Time.seconds(10))
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(new OutputTag("late") {
                })
                .sum("temperature");
        
        //获取侧输出流
        DataStream late = sumStream.getSideOutput(new OutputTag<>("late")); 
    

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号