栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink中allowedLateness介绍与测试

Flink中allowedLateness介绍与测试

默认情况下,当watermark通过end-of-window之后,再有之前的数据到达时,这些数据会被删除。

为了避免有些迟到的数据被删除,因此产生了allowedLateness的概念。

简单来讲,allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据

默认情况下,如果不指定allowedLateness,其值是0,即对于watermark超过end-of-window之后,还有此window的数据到达时,这些数据被删除掉了。

注意:对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据,会buffer起来,直到watermark超过end-of-window + watermark设置的时间+allowedLateness()的时间,窗口的数据及元数据信息才会被删除。再次计算就是DataFlow模型中的Accumulating(积累)的情况。

什么情况下数据会被丢弃或者说不会被计算?

两种情况:

a.未设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束时间,则该条数据会被丢弃;

b.设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束时间+延迟时间,则该条数据会被丢弃;

 也就是说如果一个key下面的某条数据如果延迟到来太多,就会被丢弃,这个问题可以使用测输出流来解决最后还是迟到的数据;

因为其他key的数据会上报就会提高watermark,最后触发窗口计算。

代码测试:

package com.cuichunchi.watermark;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Iterator;


public class TestWaterMark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.disableOperatorChaining();
        DataStreamSource socketTextStream = env.socketTextStream("s201", 9099);

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        SingleOutputStreamOperator> tuple2Map = socketTextStream.map(new MapFunction>() {
            @Override
            public Tuple3 map(String value) throws Exception {
                return new Tuple3<>(value.split(",")[0],1, Long.parseLong(value.split(",")[1]));
            }
        });
        //提取时间戳
        SingleOutputStreamOperator> tuple2WMDS = tuple2Map.assignTimestampsAndWatermarks(
                //设置几秒watermark
                WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner>() {
                            @Override
                            public long extractTimestamp(Tuple3 element, long recordTimestamp) {
                                System.out.println("-------提取时间戳:" + dateFormat.format( element.f2 )+ ",默认处理时间戳:" + recordTimestamp);
                                return element.f2;
                            }
                        })
                .withIdleness(Duration.ofMillis(3000))
        );

        //简单实时聚合
        
        //TODO 通过源码 TimeWindow#getWindowStartWithOffset()来生成得watermark
        //TODO org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows()
        //TODO org.apache.flink.streaming.api.windowing.windows.TimeWindow.getWindowStartWithOffset()
        //开窗聚合
        OutputTag> diltyData = new OutputTag>("data"){};
        SingleOutputStreamOperator> processDS =
                tuple2WMDS.keyBy(value -> value.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
//                容忍延迟 几秒的数据
                .allowedLateness(Time.seconds(5))
                //将延迟到的数据写入到侧输出流
                .sideOutputLateData(diltyData)
                .process(new ProcessWindowFunction, Tuple2, String,
                        TimeWindow>() {
                    @Override
                    public void process(String s, Context context, Iterable> elements,
                                        Collector> out) throws Exception {
                        long start = context.window().getStart();
                        long end = context.window().getEnd();
                        long currentWatermark = context.currentWatermark();
                        System.out.println("======参数打印:"+s+ ",elements:"+elements.toString()
                        +",当前开始window:["+dateFormat.format(start)+"],当前结束window:["+dateFormat.format(end)+
                                "],当前watermark:"+dateFormat.format(currentWatermark));

                        Iterator> it = elements.iterator();
                        int sum = 0;
                        while (it.hasNext()){
                            Tuple3 next = it.next();
                            sum += next.f1;
                        }
                        out.collect(new Tuple2<>(s,sum));
                    }
                });


        processDS.print("结果输出====》");
        processDS.getSideOutput(diltyData).print("迟到数据测输出流====》");

        env.execute();
    }
}

watermark 设置的2秒,allowedLateness设置了5秒,window设置5秒。第一个窗口:[45,50)因为watermark 是2秒,所以推迟2秒 第一个窗口才触发,在52秒的时候才会触发,并且 允许延迟时间设置的5秒,所以第一个窗口在 50+2+5 = 57秒的时候,第一个窗口才会被清除。

测试结果:

-------提取时间戳:2020-04-10 11:32:46,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:47,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:52,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489566000), (01,1,1586489567000)],当前开始window:[2020-04-10 11:32:45],当前结束window:[2020-04-10 11:32:50],当前watermark:2020-04-10 11:32:49
结果输出====》> (01,2)   (最后的结束窗口时间为50,那么在52秒的时候触发第一次窗口计算)
-------提取时间戳:2020-04-10 11:32:49,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489566000), (01,1,1586489567000), (01,1,1586489569000)],当前开始window:[2020-04-10 11:32:45],当前结束window:[2020-04-10 11:32:50],当前watermark:2020-04-10 11:32:49
结果输出====》> (01,3) (在允许迟到时间5秒范围内,50 + 2 + 5 = 57,如果超过57秒,就会停清除这个窗口的元数据,不会再触发计算,在范围内,每次到的数据都会触发一次计算)
-------提取时间戳:2020-04-10 11:32:57,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489572000)],当前开始window:[2020-04-10 11:32:50],当前结束window:[2020-04-10 11:32:55],当前watermark:2020-04-10 11:32:54
结果输出====》> (01,1)  (57秒,触发的是第二个窗口,并且第一个窗口被清除元数据)
-------提取时间戳:2020-04-10 11:32:47,默认处理时间戳:-9223372036854775808
迟到数据测输出流====》> (01,1,1586489567000)  (再次发送第一个窗口内的数据,因为第一个窗口已经被清除,所以最后在测输出流输出了)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745591.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号