栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink

flink

sideOutput

对于当前处理流中的部分或者全部数据单独输出到另外的流中

作用:

    数据选择

    ​ 对于当前处理的流可以进行自定义选择然后输出到独立的侧流中

    异常数据保留

    对于异常数据,输出到独立的侧流中,进行单独处理。如窗口中过期的延迟数据保留。

使用心法

可在如下算子中使用(别的方法貌似没有 Context 对象提供):

ProcessFunctionKeyedProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessWindowFunctionProcessAllWindowFunction

process方法提供了context 对象, 其output(OutputTag outputTag, X value);方法可收集需要输出到侧流的数据,并使用该streamdateData.getSideOutput(outputTag);方法可获取到该侧流数据。

DataStream input = ...;

//定义输出标签
final OutputTag outputTag = new OutputTag("side-output"){};

SingleOutputStreamOperator mainDataStream = input
  .process(new ProcessFunction() {

      @Override
      public void processElement(
          Integer value,
          Context ctx,
          Collector out) throws Exception {
        // 发送数据到主要的输出
        out.collect(value);

        // 发送数据到旁路输出
        ctx.output(outputTag, "sideout-" + String.valueOf(value));
      }
    });

//获取上面收集的侧流
DataStream sideOutputStream = mainDataStream.getSideOutput(outputTag);
数据选择应用(分流)

可以将当前处理流中的数据输出到多个侧流中

e.g:

将数据源中格式化失败的数据输出到 formatExceptionData 侧流中;将窗口中为偶数的数据输出到dayle_data2 流中。

public class TransformOperator {

    //定义侧流标签
    private static OutputTag> outputTag = new OutputTag>("dayle_data2"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource> dataStreams = env
                .addSource(new SourceFunction>() {
                    Random random = new Random();
                    int tp2 = 0;
                    int speed = 0;
                    int f1 = -1;

                    @Override
                    public void run(SourceContext> ctx) throws Exception {
                        while (true) {
                            TimeUnit.SECONDS.sleep(1);
                            tp2 = Math.abs(random.nextInt() % 7);
                            f1 = Math.abs(++speed + tp2);
                            ctx.collect(Tuple2.of("a", f1));
                            System.out.println("source generator :t" + f1);
                        }
                    }

                    @Override
                    public void cancel() {

                    }
                });

        //延迟数据触发边界测试
        //必须为匿名内部类
        OutputTag erroOutputTag = new OutputTag("formatExceptionData"){} ;

        SingleOutputStreamOperator> socketTextStream = env
                .socketTextStream("10.164.29.143", 10086)
                .process(new FlatMapProcess(erroOutputTag));

        SingleOutputStreamOperator> tuple2Stream = ReduceWindowPrint(socketTextStream, 5);

        //获取测流
        socketTextStream.getSideOutput(erroOutputTag).print();
        tuple2Stream.getSideOutput(outputTag).print();

        //正常处理的主流
        tuple2Stream.print();

        env.execute("event time process ");
    }

    private static SingleOutputStreamOperator> ReduceWindowPrint(SingleOutputStreamOperator> dataStreams, int dayleTime) {
        return dataStreams
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((e, t) -> e.f1*1000)
                                .withIdleness(Duration.ofSeconds(10))
                )
                .keyBy(e -> e.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() {

                    int i;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);

                        i = 0;
                    }

                    @Override
                    public void process(String s, Context context, Iterable> elements, Collector> out) throws Exception {
                        Iterator> iterator = elements.iterator();
                        while (iterator.hasNext()) {
                            Tuple2 next = iterator.next();
                            out.collect(next);
                            if (next.f1%2 == 0) {
                                //收集数据到侧流
                                context.>output(outputTag,Tuple2.of("side-" + next.f0, next.f1));
                            }
                        }
                        System.out.println("=======================  ^^^ window_num:t" + i++ + " ^^^ ==================");
                        System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )");
                        System.out.println("======================= key: " + s );
                        System.out.println("======================= watermark: " + context.currentWatermark());
                        System.out.println("======================================================");
                    }
                });
    }
}

class FlatMapProcess extends ProcessFunction> {

    private OutputTag outputTag;

    public FlatMapProcess(OutputTag outputTag) {
        this.outputTag = outputTag;
    }

    @Override
    public void processElement(String value, Context ctx, Collector> out) throws Exception {
        try {
            String[] infos = value.split("\s+");
            if (infos.length == 2) {
                out.collect(Tuple2.of(infos[0], Integer.valueOf(infos[1])));
            } else {
                ctx.output(outputTag, "format exception format_:t" + value);
            }
        } catch (NumberFormatException e) {
            ctx.output(outputTag, "format exception format_:t" + value);
        }
    }
}
异常数据保留应用

e.g: 将窗口中的不在窗口范围的异常数据输出到daly_dataStream中

心法

1.定义outputTag

2.设置窗口数据允许延迟时间

3.获取延迟数据

代码:

public class TransformOperator {

    //定义侧流标签 必须为匿名内部类
    //输入数据为偶数的数据
    private static OutputTag> outputTag = new OutputTag>("dayle_data2"){};
    //解析错误的数据
    private static OutputTag> sideOutput_dayle_data1 = new OutputTag>("sideOutput_dayle_data1"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        //必须为匿名内部类
        OutputTag erroOutputTag = new OutputTag("formatExceptionData"){} ;

        SingleOutputStreamOperator> socketTextStream = env
                .socketTextStream("10.164.29.143", 10086)
                //解析输入数据并将异常数据输出到侧流
                .process(new FlatMapProcess(erroOutputTag));

        SingleOutputStreamOperator> tuple2Stream = ReduceWindowPrint(socketTextStream, 5);

        //败获取源数据解析失的侧流
        socketTextStream.getSideOutput(erroOutputTag).map((MapFunction) e -> "parseException:t" + e).print();
        //获取迟到的数据
        tuple2Stream.getSideOutput(sideOutput_dayle_data1).map(new MapFunction, Tuple2>() {
            @Override
            public Tuple2 map(Tuple2 e) throws Exception {
                return Tuple2.of("latedata:t"+e.f0, e.f1);
            }
        }).print();
        //获取值为偶数的输入数据
        tuple2Stream.getSideOutput(outputTag).map(new MapFunction, Tuple2>() {
            @Override
            public Tuple2 map(Tuple2 value) throws Exception {
                return Tuple2.of("o_"+value.f0, value.f1);
            }
        }).print();

        //正常处理的主流
        tuple2Stream.print();

        env.execute("event time process ");
    }

    
    private static SingleOutputStreamOperator> ReduceWindowPrint(SingleOutputStreamOperator> dataStreams, int dayleTime) {

        return dataStreams
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .>forBoundedOutOfOrderness(Duration.ofSeconds(dayleTime))
                                .withTimestampAssigner((e, t) -> e.f1*1000)
                                //多并行下如果有窗口数据为空,那么窗口需要barre对齐,不会触发。
                                // 空窗口将导致下游算子都无法进行计算,
                                // 设置idleness时间那么如果存在空窗口,当别的窗口有数据并且到达设置的时间(下面为10s)
                                // 窗口就会触发
                                .withIdleness(Duration.ofSeconds(10))
                )
                .keyBy(e -> e.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //允许上个窗口范围内的数据迟到时间
                .allowedLateness(Time.seconds(2))
                //迟到数据输出到指定侧流
                .sideOutputLateData(sideOutput_dayle_data1)
                .process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() {

                    int i;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        i = 0;
                    }

                    @Override
                    public synchronized void process(String s, Context context, Iterable> elements, Collector> out) throws Exception {
                        Iterator> iterator = elements.iterator();
                        while (iterator.hasNext()) {
                            Tuple2 next = iterator.next();
                            out.collect(next);
                            if (next.f1%2 == 0) {
                                //收集数据到侧流
                                context.>output(outputTag,Tuple2.of("side-" + next.f0, next.f1));
                            }
                        }
                        System.out.println("=======================  ^^^ window_num:t" + i++ + " ^^^ ==================");
                        System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )");
                        System.out.println("======================= key: " + s );
                        System.out.println("======================= watermark: " + context.currentWatermark());
                        System.out.println("======================================================");
                    }
                });
    }
}


class FlatMapProcess extends ProcessFunction> {

    private OutputTag outputTag;

    public FlatMapProcess(OutputTag outputTag) {
        this.outputTag = outputTag;
    }

    @Override
    public void processElement(String value, Context ctx, Collector> out) throws Exception {
        try {
            String[] infos = value.split("\s+");
            if (infos.length == 2) {
                out.collect(Tuple2.of(infos[0], Integer.valueOf(infos[1])));
            } else {
                ctx.output(outputTag, "format exception format_:t" + value);
            }
        } catch (NumberFormatException e) {
            //异常数据收集到outputTag指定的侧流
            ctx.output(outputTag, "format exception format_:t" + value);
        }
    }
}

输入

a 

a 0
a 1
a 10
a 15
a 16
a 10
a 9
a 8
a 1
a 12
a 1
a 15
a 1
a 17
a 1

输出

2> parseException:	format exception format_:	a 
1> parseException:	format exception format_:	
2> (a,0)
2> (o_side-a,0)
2> (a,1)
=======================  ^^^ window_num:	0 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 9999
======================================================
2> (a,0)
2> (o_side-a,0)
2> (a,1)
2> (a,9)
=======================  ^^^ window_num:	1 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 10999
======================================================
2> (a,0)
2> (o_side-a,0)
2> (a,1)
2> (a,9)
2> (a,8)
2> (o_side-a,8)
=======================  ^^^ window_num:	2 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 10999
======================================================
2> (a,0)
2> (o_side-a,0)
2> (a,1)
2> (a,9)
2> (a,8)
2> (o_side-a,8)
2> (a,1)
=======================  ^^^ window_num:	3 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 10999
======================================================
2> (a,0)
2> (o_side-a,0)
2> (a,1)
2> (a,9)
2> (a,8)
2> (o_side-a,8)
2> (a,1)
2> (a,1)
=======================  ^^^ window_num:	4 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 10999
======================================================
2> (a,0)
2> (o_side-a,0)
2> (a,1)
2> (a,9)
2> (a,8)
2> (o_side-a,8)
2> (a,1)
2> (a,1)
2> (a,1)
=======================  ^^^ window_num:	5 ^^^ ==================
======================= range: [0___10000 )
======================= key: a
======================= watermark: 10999
======================================================
2> (latedata:	a,1)    -- 输入17后该条数据失效
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/722502.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号