栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink 空闲窗口-withIdleness

flink 空闲窗口-withIdleness

flink 空闲窗口

flink多并行时,如果有窗口中没数据,那么有数据的窗口即使watermark到达了触发边界,barren没对齐,窗口也不会触发计算。这样的空窗口即空闲窗口。可通过设置空闲时间(withIdleness)来使有数据的窗口进行触发。

parallellism:2

windowSize: 10s

forBoundOutofOrderness: 5s

withIdleness:10s

窗口数据触发范围为[n*size, (n+1)*size+5)

窗口数据有效范围为[n*size, (n+1)*size)

输入:
a 1
a 10
a 15	-- 到达第一个窗口触发边界,因有两个task,另一个task中并无数据,所以不会正常触发
a 20   	第二个窗口task2 数据
b 1		-- 过期被丢弃
b 19	第二个窗口task1 数据
b 25	第二个窗口task1 数据,且到达触发边界,窗口触发
输出:
2> (a,1)
=======================  ^^^ window_num:	0 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 9999
======================================================
只有一个窗口中有数据,到达空闲时间10s后才会触发

1> (b,19)
2> (a,10)
=======================  ^^^ window_num:	0 ^^^ ==================
======================= range: [10000___20000 )
======================= key: b
2> (side-a,10)
======================= watermark: 19999
======================================================
2> (a,15)
=======================  ^^^ window_num:	1 ^^^ ==================
======================= range: [10000___20000 )
======================= key: a
======================= watermark: 19999
======================================================
两个窗口中都有数据,当有一个窗口数到达触发时间戳数据两个窗口都会触发

demo:
    //定义侧流标签
    private static OutputTag> outputTag = new OutputTag>("dayle_data2"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        DataStreamSource> dataStreams = env
                .addSource(new SourceFunction>() {
                    Random random = new Random();
                    int tp2 = 0;
                    int speed = 0;
                    int f1 = -1;

                    @Override
                    public void run(SourceContext> ctx) throws Exception {
                        while (true) {
                            TimeUnit.SECONDS.sleep(1);
                            tp2 = Math.abs(random.nextInt() % 7);
                            f1 = Math.abs(++speed + tp2);
                            ctx.collect(Tuple2.of("a", f1));
                            System.out.println("source generator :t" + f1);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });

  
        //必须为匿名内部类
        OutputTag erroOutputTag = new OutputTag("formatExceptionData"){} ;

        SingleOutputStreamOperator> socketTextStream = env
                .socketTextStream("localhost", 10086)
                //异常数据侧流输出
                .process(new FlatMapProcess(erroOutputTag));

        SingleOutputStreamOperator> tuple2Stream = ReduceWindowPrint(socketTextStream, 5);

        //获取测流
        socketTextStream.getSideOutput(erroOutputTag).print();
        tuple2Stream.getSideOutput(outputTag).print();

        //正常处理的主流
        tuple2Stream.print();

        env.execute("event time process ");
    }

    private static SingleOutputStreamOperator> ReduceWindowPrint(SingleOutputStreamOperator> dataStreams, int dayleTime) {
        return dataStreams
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .>forBoundedOutOfOrderness(Duration.ofSeconds(dayleTime))
                                .withTimestampAssigner((e, t) -> e.f1*1000)
                                //多并行下如果有窗口数据为空,那么窗口需要barre对齐,不会触发。
                                // 空窗口将导致下游算子都无法进行计算,
                                // 设置idleness时间那么如果存在空窗口,当别的窗口有数据并且到达设置的时间(下面为10s)
                                // 窗口就会触发
                                .withIdleness(Duration.ofSeconds(10))
                )
                .keyBy(e -> e.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //允许上个窗口数据迟到时间
//                .allowedLateness(Time.seconds(5))
               
                .process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() {

                    int i;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        i = 0;
                    }

                    @Override
                    public synchronized void process(String s, Context context, Iterable> elements, Collector> out) throws Exception {
                        Iterator> iterator = elements.iterator();
                        while (iterator.hasNext()) {
                            Tuple2 next = iterator.next();
                            out.collect(next);
                            if (next.f1%2 == 0) {
                                //收集数据到侧流
                                context.>output(outputTag,Tuple2.of("side-" + next.f0, next.f1));
                            }
                        }
                        System.out.println("=======================  ^^^ window_num:t" + i++ + " ^^^ ==================");
                        System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )");
                        System.out.println("======================= key: " + s );
                        System.out.println("======================= watermark: " + context.currentWatermark());
                        System.out.println("======================================================");
                    }
                }); 
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728105.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号