栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink watermark 测试

Flink watermark 测试

代码如下:

package com.cuichunchi.watermark;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Iterator;


public class TestWaterMark {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setString(RestOptions.BIND_PORT,"19082");
        StreamExecutionEnvironment env = StreamExecutionEnvironment
                .createLocalEnvironmentWithWebUI(conf);
//                .getExecutionEnvironment();

        env.setParallelism(2);
        env.disableOperatorChaining();
        DataStreamSource socketTextStream = env.socketTextStream("s201", 9099);

        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        SingleOutputStreamOperator> tuple2Map = socketTextStream.map(new MapFunction>() {
            @Override
            public Tuple3 map(String value) throws Exception {
                return new Tuple3<>(value.split(",")[0],1, Long.parseLong(value.split(",")[1]));
            }
        });
        //提取时间戳
        SingleOutputStreamOperator> tuple2WMDS = tuple2Map.assignTimestampsAndWatermarks(
                //设置几秒watermark
                WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner>() {
                            @Override
                            public long extractTimestamp(Tuple3 element, long recordTimestamp) {
                                System.out.println("-----key:"+element.f0+"-----提取时间戳:" + dateFormat.format( element.f2 )+ "," +
                                        "默认处理时间戳:" + recordTimestamp);
                                return element.f2;
                            }
                        })
                //如果窗口一直没有数据,导致watermark不会向前推进,那么就一直不会触发窗口计算,该参数就是解决窗口不会被触发的问题
                //后面有时间再测试这个,需要多个分区测试
                
//                .withIdleness(Duration.ofSeconds(10))
        );
        //TODO 说明:
        

        //简单实时聚合
        
        //TODO 通过源码 TimeWindow#getWindowStartWithOffset()来生成得watermark
        //TODO org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows.assignWindows()
        //TODO org.apache.flink.streaming.api.windowing.windows.TimeWindow.getWindowStartWithOffset()
        //开窗聚合
        OutputTag> diltyData = new OutputTag>("data"){};
        SingleOutputStreamOperator> processDS =
                tuple2WMDS
                .keyBy(value -> value.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
//                容忍延迟 几秒的数据
                .allowedLateness(Time.seconds(5))
                //将延迟到的数据写入到侧输出流
                .sideOutputLateData(diltyData)
                .process(new ProcessWindowFunction, Tuple2, String,
                        TimeWindow>() {

                    @Override
                    public void process(String s, Context context, Iterable> elements,
                                        Collector> out) throws Exception {
                        long start = context.window().getStart();
                        long end = context.window().getEnd();
                        long currentWatermark = context.currentWatermark();
                        int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
                        System.out.println("======参数打印:当前subtasks:"+numberOfParallelSubtasks+",key:"+s+ ",elements:"+elements.toString()
                        +",当前开始window:["+dateFormat.format(start)+"],当前结束window:["+dateFormat.format(end)+
                                "],当前watermark:"+dateFormat.format(currentWatermark));

                        Iterator> it = elements.iterator();
                        int sum = 0;
                        while (it.hasNext()){
                            Tuple3 next = it.next();
                            sum += next.f1;
                        }
                        out.collect(new Tuple2<>(s,sum));
                    }
                }).setParallelism(2);


        processDS.print("结果输出====》");
        processDS.getSideOutput(diltyData).print("迟到数据测输出流====》");

        env.execute();
    }
}

单分区测试结果:


没有设置watermark
-------提取时间戳:2020-04-10 11:32:46,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:47,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:48,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:49,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:50,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489566000), (01,1,1586489567000), (01,1,1586489568000), (01,1,1586489569000)],当前开始window:[2020-04-10 11:32:45],当前结束window:[2020-04-10 11:32:50],当前watermark:2020-04-10 11:32:49 (到50触发这个窗口计算)
(01,4)
-------提取时间戳:2020-04-10 11:32:51,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:52,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:53,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:54,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:55,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489570000), (01,1,1586489571000), (01,1,1586489572000), (01,1,1586489573000), (01,1,1586489574000)],当前开始window:[2020-04-10 11:32:50],当前结束window:[2020-04-10 11:32:55],当前watermark:2020-04-10 11:32:54
(01,5)


设置2秒watermark
-------提取时间戳:2020-04-10 11:32:46,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:47,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:48,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:50,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:51,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:52,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489566000), (01,1,1586489567000), (01,1,1586489568000)],当前开始window:[2020-04-10 11:32:45],当前结束window:[2020-04-10 11:32:50],当前watermark:2020-04-10 11:32:49  (到52秒触发这个窗口计算,多等待了2秒)
结果输出====》> (01,3)
-------提取时间戳:2020-04-10 11:32:49,默认处理时间戳:-9223372036854775808   ------ 丢数据了,因为上一个窗口已经计算过了,被销毁了
-------提取时间戳:2020-04-10 11:32:54,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:55,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:56,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:57,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489570000), (01,1,1586489571000), (01,1,1586489572000), (01,1,1586489574000)],当前开始window:[2020-04-10 11:32:50],当前结束window:[2020-04-10 11:32:55],当前watermark:2020-04-10 11:32:54   (到57秒才开始这个窗口的计算)
结果输出====》> (01,4)


设置5秒的watermark
-------提取时间戳:2020-04-10 11:32:46,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:47,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:50,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:53,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:54,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:32:55,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489566000), (01,1,1586489567000)],当前开始window:[2020-04-10 11:32:45],当前结束window:[2020-04-10 11:32:50],当前watermark:2020-04-10 11:32:49   (到55秒触发这个窗口的计算,延迟了5秒)
结果输出====》> (01,2)
-------提取时间戳:2020-04-10 11:32:57,默认处理时间戳:-9223372036854775808
-------提取时间戳:2020-04-10 11:33:00,默认处理时间戳:-9223372036854775808
======参数打印:01,elements:[(01,1,1586489570000), (01,1,1586489573000), (01,1,1586489574000)],当前开始window:[2020-04-10 11:32:50],当前结束window:[2020-04-10 11:32:55],当前watermark:2020-04-10 11:32:54   (到60的适合触发窗口计算,延迟5秒)
结果输出====》> (01,3)

多分区测试-->涉及到watermark的传递(watermark设置2秒)

设置两个分区:

14:56:57.779 [flink-akka.actor.default-dispatcher-36] ERROR org.apache.flink.runtime.rest.handler.job.JobDetailsHandler - Exception occurred in REST handler: Job aa25794d54a90d9f39bc3367c8661a09 not found
14:56:57.835 [flink-akka.actor.default-dispatcher-32] ERROR org.apache.flink.runtime.rest.handler.job.JobExceptionsHandler - Exception occurred in REST handler: Job aa25794d54a90d9f39bc3367c8661a09 not found
-----key:01-----提取时间戳:2020-04-10 11:32:46,默认处理时间戳:-9223372036854775808
-----key:02-----提取时间戳:2020-04-10 11:32:46,默认处理时间戳:-9223372036854775808
-----key:01-----提取时间戳:2020-04-10 11:32:48,默认处理时间戳:-9223372036854775808
-----key:02-----提取时间戳:2020-04-10 11:32:53,默认处理时间戳:-9223372036854775808
-----key:01-----提取时间戳:2020-04-10 11:32:52,默认处理时间戳:-9223372036854775808
======参数打印:当前subtasks:2,key:01,elements:[(01,1,1586489566000), (01,1,1586489568000)],当前开始window:[2020-04-10 11:32:45],当前结束window:[2020-04-10 11:32:50],当前watermark:2020-04-10 11:32:49
======参数打印:当前subtasks:2,key:02,elements:[(02,1,1586489566000)],当前开始window:[2020-04-10 11:32:45],当前结束window:[2020-04-10 11:32:50],当前watermark:2020-04-10 11:32:49
结果输出====》:1> (01,2)
结果输出====》:1> (02,1)
说明:提取并生成watermark是在map阶段开始的,从socket获取数据后轮询的发送到maptask,然后生成的watermark广播到下游,在下游task中,接受到的watermark会按照木板效应取最小的时间戳来进行窗口统计,所以第一次发送46秒的数据,生成44秒的watermark并发往分区0号中,第二次发送46的数据,生成44秒的watermark并发往分区1号中,第三次发送48秒的数据并生成46秒的watermark并发往分区0号,第四次发送53秒的数据并生成51秒的watermark并发往分区1号,此时的分区watermark为(48,51)并没有触发窗口计算,因为取48秒的这个最小值,还未到触发窗口计算的时机,继续第五次发送52秒的数据并生成50秒的watermark并发往0号分区,此时分区watermark(50,51),那么取最小的50,则触发了窗口的计算。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745875.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号