栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Flink】时间语义和WaterMark

【Flink】时间语义和WaterMark

时间语义和WaterMark

时间语义WaterMark

时间语义

在Flink中时间可以分为三种,分别是

1️⃣:Event Time:事件创建的时间

2️⃣:Ingestion Time:数据进入Flink的时间

3️⃣:Processing Time:执行操作算子的本地系统时间,与机器相关


谈这三个时间主要是为了引出watemark,因为很多场景下,事件发生的时间事件时间是我们业务所关心的,基于事件时间计算,采用某种策略,则无论是采用实时流数据还是历史数据,都可以保证结果是一致的为了更生动的描述事件时间和事件流进系统(这里指Flink)的关系,但是在传输过程中数据并不能按照时间戳的顺序传输到程序中

1️⃣:针对实时流计算,一般的处理方式是来一个元素处理一个元素,这样才能实时。但是针对基于Event Time的一些应用,我们要求处理的准确性,必须缓存,因为第一个事件到达时,不知道后面来的事件发生的时间比当前的事件早,因此必须要等到至少第二个事件到达才能确定是否输出第一个事件的计算结果,这样就会造成延迟。

2️⃣:但是在第二个事件到达后,是否还有事件比其发生的事件更早呢,是否继续缓存等待下去?如果等待下去,等待多久呢?因此必须要有个机制策略保证不再等待,触发当前缓存的数据计算并输出。

3️⃣:那么,当前的计算已经计算并输出,如果再较早发生的事件晚到达了,怎么处理?我们想到了两种处理策略:1,把迟到的事件加进上次缓存数据中重新计算输出; 2,丢弃不计算第二种策略丢弃不计算好处理,第一种策略需要上次的缓存数据,这里又会面临一个两个问题:1,上次缓存数据计算后不能清除缓存; 2,缓存要保留多久,因为如果一直保留缓存,势必造成增加整个系统的内存压力等。

这时就需要使用waterMark了 WaterMark

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常一条记录中某个字段就代表了该记录的发生时间。例如基于Event Time的数据,自身都包含一个类型为timestamp的属性

基于该属性定义一个策略为偏移3s的watermark,这条数据的水印时间戳则是:该属性的时间戳-3000

这时若我们定义的时间窗口的时间为15s,当第十五秒的时间到了后并不会结束,因为此时的waterMark时间比window慢三秒

图解watermark
我们设置一个偏移为5秒的watermark策略,大小为10秒的窗口,为了能更好的理解watermark,我们作如下类比,数据发生的时间空间为A时间空间,watermark的时间空间为B时间空间,则B时间空间总比A时间空间晚5秒发生

如上图,矩形小框代表窗口大小,大小为10秒,Flink默认会根据选择的时间(这里是Event Time)分配窗口。假设数据发生的时间rowtime从0开始,则预先分配的窗口即使[0,10),[10,20],[20,30],[30,40]

A时间轴上的时刻是一定的,同样B时间轴上的时刻也是一定的,B空间时间轴上的时刻相对A时刻轴上的时刻总是晚5秒。在同一时间坐标系S下,假设S时间坐标和A时间一样,则A时间轴上的时刻在S坐标系下时间值不变,但B时间轴上的时刻在S时间坐标系下时间值都变“大” 5s了。即在第一个窗口[0,10],如果一个记录中rowtime为10s的数据在S坐标系下9s到达了,但是其watemark其实是10-5 = 5s,还没有到达第一个窗口的end Time,故不会触发窗口计算;如果一个记录中rowtime为8s的数据在S坐标系下12s到达了,但其watermark其实是8-5=3s小于之前的watermark,故此时不更新watermark(一般情况下),watermark的时间戳仍然是5秒,也没有达到第一个窗口的触发条件;如果一个记录中rowtime为12s的数据在S坐标系下13s到达了,其watemark其实是12-5 = 7 > 5,更新watermark的时间戳为7秒,但是也没有达到一个窗口的触发条件;如果一个记录中rowtime为15s的数据到达了,其watemark其实是15 -5 = 10s,达到了触发条件 ,大于window endTime,故窗口此时触发计算,如果后面再有rowtime<10s的数据到达,将会被丢弃(没有设置latness选项)


代码实现
Event Time 的使用一定要指定数据源中的时间戳调用assignTimestampAndWatermarks方法,传入一个BoundedOutOfOrdernessTimestampExtractor,就可以指定
若数据是有序的,不需要延迟触发,可以只指定时间戳就行了
关于latness的设置,latness主要是处理迟到的数据

        OutputTag late = new OutputTag<>("late");
        dataStream
                .keyBy("id")
                .timeWindow(Time.seconds(15))
                .allowedLateness(Time.minutes(1))//允许超市一分钟
                .sideOutputLateData(late);//超时的数据单独分成与i个流

吃到时间在一分钟内的数据可以加入计算,时间超过了一分钟的化就会保存到late流里等待处理
跳转顶部


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/734672.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号