栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Flink】【第六章 Window】

【Flink】【第六章 Window】

一、Window概述

流计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。flink中的window可以理解为水桶,数据理解为水流,水流源源不断,对于DataStream来说是来一个数据处理一条,有了桶之后,将数据装进桶中再处理,在窗口关闭的时候将计算结果输出

窗口=数据桶 ;窗口本质 = 数据 union在一起

二、Window类型

Flink中Window可以分成两大类

(1)CountWindow:按照指定的数据条数生成一个Window,与时间无关。

滚动计数窗口滑动计数窗口

(2)TimeWindow: 按照时间生成Window。

滚动窗口滑动窗口session Window

Spark中窗口只有时间窗口

1.滚动窗口(Tumbling Windows)

(1)将数据依据固定的窗口长度对数据进行切片。
(2)特点:窗口长度固定,窗口之间没有数据重叠;窗口是左闭右开;窗口对齐
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

2. 滑动窗口(Sliding Windows)

特点:时间对齐,窗口长度固定,窗口之间有数据重叠;左闭右开;窗口对齐
例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:

一般情况下,窗口大小都是步长的整数倍,否则会造成有的数据属于n个窗口,有的数据属于n+1个窗口;步长越大,窗口之间重叠数据越少。
窗口1h,步长30min,每个数据属于两个窗口
窗口1h,步长5min,每个数据属于12个窗口

3. 会话窗口(Session Windows)

会话窗口属于时间窗口
特点:窗口的长度不固定,而是当隔一段时间没有数据产生,就关闭窗口;时间无对齐

需要指定session gap time 间隙时间,session gap time是窗口之间的最小时间间隔,当gap time内没有数据产生,就截取窗口由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。
三、Window API

开窗操作都是建立在keyedStream,因为做聚合往往是以类聚合。普通的dataStream只有windowAll

1.窗口的创建

Flink提供了 .timeWindow和.countWindow方法,用于定义时间窗口和计数窗口
① 滚动时间窗口 Tumblingtimewindow

.timeWindow(Time.seconds(5))

② 滑动时间窗口 slidingtimewindow

.timeWindow(Time.seconds(5),Time.seconds(2))

③ 会话窗口 session window

.window(EventTimeSessionWindows.withGap(Time.seconds(5));)

④ 滚动计数窗口 Tumblingcountwindow

.countWindow(5)

⑤ 滑动计数窗口 slidingCountWindow

.countWindow(10,2)
1.1 TimeWindow

TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算

1.1.1 滚动窗口

Flink默认的时间窗口根据Processing Time 进行窗口的划分,将Flink获取到的数据根据进入Flink的时间划分到不同的窗口中。

DataStream> minTempPerWindowStream = dataStream
                .map(new MapFunction>() {
                    @Override
                    public Tuple2 map(SensorReading value) throws Exception {
                        return new Tuple2<>(value.getId(), value.getTemperature());
                    }
                })
                .keyBy(data -> data.f0)
                .timeWindow( Time.seconds(15) )
                .minBy(1);

(1) 时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。
(2) 滚动窗口,按照窗口大小时间计算、输出一次

1.1.2 滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了5s,也就是说,每5s就计算输出结果一次,每一次计算的window范围是15s内的所有元素。

DataStream minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .timeWindow( Time.seconds(15), Time.seconds(5) )
                .minBy("temperature");

(1)滑动窗口,按照步长时间输出一次,一共输出 窗口/步长 次
(2)滚动窗口本质上也是滑动窗口,滚动窗口的步长和窗口大小一致,一次只输出一次,且输出间隔为窗口大小

1.1.3 Session Window
    将连续访问的数据放在一个窗口中,当停止时间超过指定时间,开始切窗口计算。session window的创建没有简写,必须通过window()方法,参数是窗口分配器

执行效果:连续输入不会看到结果,停止5s,就能输出结果了。

1.2 CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果

1.2.1 滚动窗口

默认的CountWindow是一个滚动窗口,只需要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。

DataStream minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .countWindow( 5 )
                .minBy("temperature");

注意: CountWindow是建立在消息个数上的,而且是keyedStream,所以CountWindow的window_size和step_size都是指的是相同Key的元素的个数,不是输入的所有元素的总数。

1.2.2 滑动窗口

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是window_size,一个是sliding_size。
滑动步长:下面代码中的sliding_size设置为了2,也就是说,每收到两个相同key的数据就计算一次,每一次计算的window范围是5个元素

DataStream minTempPerWindowStream = dataStream
                .keyBy(SensorReading::getId) 
                .countWindow( 5, 2 )
                .minBy("temperature");

2. 基于窗口的聚合操作

1.窗口中收集数据的目的是为了做计算的,flink中提供了window function 用来定义要对窗口中收集的数据做的计算操作;

2.flink窗口的计算和输出可以独立进行;
也就是说可以只计算不输出,等到某一时刻才触发输出

2.1 增量聚合函数(incremental aggregation functions)

特点:来一条数据计算一次,但是不输出,当窗口达到闭合的条件的时候才会输出

典型的增量聚合函数有:

ReduceFunctionAggregateFunction(只能在keyedStream开窗后使用)Max、maxBy,min,minBy 2.2 全量窗口函数(full window functions)

特点:先把窗口所有数据收集起来不做计算,等到窗口关闭的时候,提供一个迭代器存储了窗口中所有的数据,等到触发计算的时候,会遍历所有数据,进行计算。

全窗口函数有:

ProcessWindowFunctionapply()

/ 
* @param function The window function.
 * @return The data stream that is the result of applying the window function to the window.
 */
public  SingleOutputStreamOperator apply(WindowFunction function) {
WindowFunction:是个接口,有四个参数。
 
* @param  The type of the input value.
 * @param  The type of the output value.
 * @param  The type of the key.
 * @param  The type of {@code Window} that this window function can be applied on.
 */
@Public
public interface WindowFunction extends Function, Serializable {
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.socketTextStream("hadoop102", 9999);
        SingleOutputStreamOperator> map = source.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                return new Tuple2<>(value, 1);
            }
        });

        KeyedStream, Tuple> kStream = map.keyBy(0);
        WindowedStream, Tuple, TimeWindow> ttwindow = kStream.timeWindow(Time.seconds(5));
       //todo  用全量函数做窗口的wordCount 全量函数有两种:apply和ProcessWindowFunction
       // 此处展示apply()的使用
       // apply的参数是windowFunction,windowFunction是一个接口,继承了Function和序列化
        SingleOutputStreamOperator> apply = ttwindow.apply(new myWindowFunction());

        apply.print();
        
        env.execute();

    }
    //todo 做wordCount  windowFunction需要四个泛型
//        * @param  The type of the input value.
//        * @param  The type of the output value.
//        * @param  The type of the key.(就是keydStream的key类型)
//        * @param  The type of {@code Window} that this window function can be applied on.
//
    public static class myWindowFunction
            implements WindowFunction,Tuple2,
            Tuple,TimeWindow>{

        @Override
        public void apply(Tuple tuple, TimeWindow window, Iterable> input, Collector> out) throws Exception {
             //TODO 实现apply方法,四个参数:
            //  第一个就是KeyedStream的key
            //  第二个是window  可以获取窗口的信息,比如窗口的起始和结束时间
            //  第三个是窗口所有数据组成的迭代器
            //  第四个是Collector,用于写出数据的

            //todo 1.获取key   改成String类型
            String key = tuple.getField(0);
            // todo 2.全量函数会将窗口key相同的数据装到一个迭代器中
            //  遍历迭代器处理key相同的数据集
            Iterator> inputIter = input.iterator();
            int count = 0;
            while(inputIter.hasNext()){
                count += inputIter.next().f1;
            }
            // todo 3.处理完毕 用collector进行输出  和 flatMap一样
            out.collect(new Tuple2(key,count));
        }



额外说明:

增量函数比全量效率高全量的应用场景为必须拿到所有数据才能做计算。全量函数可以拿到窗口信息,增量无法拿到窗口信息

apply函数:参数是WindowFunction

2.3 其它可选API trigger() 触发器

定义 window 什么时候关闭,触发计算并输出结果控制窗口什么时候关闭,数据什么时候计算。窗口的关闭,计算,输出三者都可以独立操作。 evitor() —— 移除器

定义移除某些数据的逻辑一般不用,窗口前就已经过滤了

以下三个函数只能在事件时间语义下使用

.allowedLateness() —— 允许处理迟到的数据.sideOutputLateData() —— 将迟到的数据放入侧输出流.getSideOutput() —— 获取侧输出流
如果waterMark已经达到了窗口闭合时间,但是允许迟到数据,就会将迟到数据放进侧输出流 3. 完整的窗口调用流程

额外说明:图中中括号是可选项,聚合函数是必选项(开窗就是为了做聚合计算的)

(1) Keyed Windows

(2) Noe-Keyed Windows

对于Non-keyed WindowAll来说必须要有触发器,否则不计算,会一直收集数据。TimeWindowAll可以自己触发计算windowAll操作一般不用了 四、窗口源码分析 1. 窗口的创建 1.1 timeWindow(Time Size)

1.2 timeWindow(Time size, Time slide)

1.3 窗口分配器-WindowAssigner

@PublicEvolving
public abstract class WindowAssigner implements Serializable {
	private static final long serialVersionUID = 1L;

	
	public abstract Collection assignWindows(T element, long timestamp, WindowAssignerContext context);

	
	public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment env);

	
	public abstract TypeSerializer getWindowSerializer(ExecutionConfig executionConfig);

	
	public abstract boolean isEventTime();

	
	public abstract static class WindowAssignerContext {

		
		public abstract long getCurrentProcessingTime();

	}
}

WindowAssigner 可以分配窗口的类型。一个窗口分配器可以为一个element分配 zero or more 个窗口在一个窗口操作中,元素会按照key(keyedStream)和其所属的窗口进行分组,每一组叫做一个pane触发器(Trigger)决定某一个pane应该触发 WindowFunction生成该窗口的输出分配器的assignWindows方法用于给element分配窗口,其返回值是Collection,表示每个元素可能分配0到多个窗口 2. 处理时间窗口分配器 - TumblingProcessingTimeWindows


创建滚动窗口本质是用滚动窗口分配器-TumblingProcessingTimeWindows来创建的

public class TumblingProcessingTimeWindows extends WindowAssigner {
	private static final long serialVersionUID = 1L;

	private final long size;

	private final long offset;

	private TumblingProcessingTimeWindows(long size, long offset) {
		if (Math.abs(offset) >= size) {
			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
		}

		this.size = size;
		this.offset = offset;
	}

	@Override
	public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		final long now = context.getCurrentProcessingTime();
		long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
		return Collections.singletonList(new TimeWindow(start, start + size));
	}

	public long getSize() {
		return size;
	}

	@Override
	public Trigger getDefaultTrigger(StreamExecutionEnvironment env) {
		return ProcessingTimeTrigger.create();
	}

	@Override
	public String toString() {
		return "TumblingProcessingTimeWindows(" + size + ")";
	}

	
	public static TumblingProcessingTimeWindows of(Time size) {
		return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
	}

	
	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
	}

	@Override
	public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {
		return new TimeWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return false;
	}
}
2.1 滚动时间窗口分配器

来观察TumblingProcessingTimeWindows 的窗口分配方法assignWindows:


知识点: timestamp - (timestamp - offset + windowSize) % windowSize

特点:窗口是按照 windowSize对齐的

比如:窗口大小为:5s,当前时间为9:00:019:00:01 - (9:00:01 - 0 + 5s)% 5 = 01 - (06)%5 = 0,所以窗口范围为[9:00:00,9:00:05)从上面也能看出偏移量能移动窗口的偏移量

+windowSize的目的:

timestamp - offset 可能为负数,当时间为1970年的时候,timestamp就是0,
这会导致start > timestamp

offset

时间偏移量的作用就是调整窗口的位置;offset > 0 是向后挪动;
比如offset设置为1s,那么窗口范围变为:[x分1s,x分6s)、[x分6s,x分11s)…应用场景:底层时间用的是UTC,对于中国东8区,比UTC标准时间早8h,那么在中国,如果窗口大小开为:Time.days(1),窗口范围就是早晨8点到晚上8点,并非0点到24点。需要将offset设置为-8,向前调整8h

知识点: 一次只返回一个窗口

2.2 滑动窗口分配器


SlidingProcessingTimeWindows


public class SlidingProcessingTimeWindows extends WindowAssigner {
	private static final long serialVersionUID = 1L;

	private final long size;

	private final long offset;

	private final long slide;

	private SlidingProcessingTimeWindows(long size, long slide, long offset) {
		if (Math.abs(offset) >= slide || size <= 0) {
			throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy " +
				"abs(offset) < slide and size > 0");
		}

		this.size = size;
		this.slide = slide;
		this.offset = offset;
	}

	@Override
	public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		timestamp = context.getCurrentProcessingTime();
		List windows = new ArrayList<>((int) (size / slide));
		long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
		for (long start = lastStart;
			start > timestamp - size;
			start -= slide) {
			windows.add(new TimeWindow(start, start + size));
		}
		return windows;
	}

	public long getSize() {
		return size;
	}

	public long getSlide() {
		return slide;
	}

	@Override
	public Trigger getDefaultTrigger(StreamExecutionEnvironment env) {
		return ProcessingTimeTrigger.create();
	}

	@Override
	public String toString() {
		return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
	}

	
	public static SlidingProcessingTimeWindows of(Time size, Time slide) {
		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
	}

	
	public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
	}

	@Override
	public TypeSerializer getWindowSerializer(ExecutionConfig executionConfig) {
		return new TimeWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return false;
	}
}

来观察SlidingProcessingTimeWindows是如何分配窗口的:

窗口个数:

size / slide

最后一个窗口的起始时间:
和滚动窗口的算法是一样的


分配的窗口有哪些?
for (long start = lastStart;start > timestamp - size; start -= slide)

比如一条数据时间是9:01,窗口大小为15min,步长5min

    window数组:Windows = new ArrayList<>(3)最后一个窗口的起始时间:lastStart = 9:00,利用窗口大小和步进遍历,创建新的窗口加入window数组中:
    [9:00 , 9:15)
    [8:55 , 9:10)
    [8:50 , 9:05)
3. 事件时间窗口分配器 4.窗口左闭右开的问题

TimeWindow类中:

5.窗口大小和滑动步长不是整数倍的问题

假设时间为9:01,窗口大小5min,步长2min

从图中可以看出来00-01s之间的数据落在三个窗口,01-02的数据落在2个窗口
结论:滑动窗口窗口大小和步长不是整数倍,会导致有的数据会落在 n+1个窗口,有的会落在n个窗口; n = WindowSize / SlideSize

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/707198.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号