栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink 的侧输出 和一个流拆分成多个流

Flink 的侧输出 和一个流拆分成多个流

1. Flink 侧输出流

官网 :https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/side_output/

1.1 理解
 侧输出流就是将问题数据或者 不符合条件的数据进行输出到数据库中或者打印出来, 就形成一个正确的流和一个不符合条件的流。
1.2 实例
   案例:  判断 输入的字符串是否等于 "big"  如果等于big  就输入到  主输出流  否则将抛出异常
package com.wudl.flink.stream;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.Media.print;



public class Flink_SideOutput {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource dsStream = env.socketTextStream("192.168.1.130", 9999);
        OutputTag outputTag = new OutputTag("number") {
        };
        SingleOutputStreamOperator process = dsStream.process(new ProcessFunction() {
            @Override
            public void processElement(String s, Context ctx, Collector collector) throws Exception {

                // 判断 输入的字符串是否等于 "big"  如果等于big  就输入到  主输出流  否则将抛出异常
                   try {
                       if (s.equals("big")) {
                           collector.collect(s);
                       }else
                       {
                           throw new Exception();
                       }
                   }catch (Exception e)
                   {
                     //  e.printStackTrace();
                       ctx.output(outputTag,s);
                   }
            }
        });

        // 打印主流
        process.print("打印主流 ----->");
        //  打印错误的的流或者是 不符合条件的流
        process.getSideOutput(outputTag).print("侧输出流----->");
        env.execute();

    }

}

1.3结果

3. Flink 使用侧输出流把一个流拆成多个流

Flink 12 以后官方建议我们使用多侧输入流来输出

3.1实例:
 根据输入的数据 进行条件判断然后输入到不同的侧输入流中。
package com.wudl.flink.stream;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;



public class Flink_SideOutputTwo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource socketTextStream = env.socketTextStream("192.168.1.130", 9999);
        OutputTag outputTag6 = new OutputTag("number6-10") {
        };

        OutputTag outputTag10 = new OutputTag("number10") {
        };

        SingleOutputStreamOperator process = socketTextStream.process(new ProcessFunction() {
            @Override
            public void processElement(String s, Context cxt, Collector out) throws Exception {
                int number = Integer.parseInt(s);
                if (number < 5) {
                    out.collect(number);
                } else if (number > 5 && number < 10) {
                    cxt.output(outputTag6, number);
                } else {
                    cxt.output(outputTag10, number);
                }
            }
        });
        process.print(" 主流-------->");
        process.getSideOutput(outputTag6).print("侧输入流--  大于5 小于10 的数据---");

        process.getSideOutput(outputTag10).print("侧输入流--  大于10 的数据---");

        env.execute();


    }

}

3.2 结果


********************************************************** 应用的实例***********************************************************************

4. flink 侧输入流的实例应用
案例: 通过开窗将迟到的数据输出到侧输入流中 , 
package com.wudl.flink.stream;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;



public class Flink_SideOutput_Examples {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource stringDataStreamSource = env.socketTextStream("192.168.1.130", 9999);
        SingleOutputStreamOperator map = stringDataStreamSource.map(new MapFunction() {
            @Override
            public WaterSensor map(String s) throws Exception {
                String[] split = s.split(",");
                return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.parseInt(split[2]));
            }
        });

        // 创建产生水印策略
        WatermarkStrategy wms = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner(new SerializableTimestampAssigner() {
                    @Override
                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                        return element.getTs() * 1000;
                    }
                });


        SingleOutputStreamOperator process = map.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(new OutputTag("side_1") {
                })
                .process(new ProcessWindowFunction() {
                    @Override
                    public void process(String key, Context context, Iterable elements, Collector out) throws Exception {
                        String msg = "当前key: " + key + " 窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
                                + elements.spliterator().estimateSize() + "条数据" +
                                "watermark: " + context.currentWatermark();

                        out.collect(context.window().toString());
                        out.collect(msg);
                    }
                });


        process.print("主输出流---->");
        process.getSideOutput(new OutputTag("side_1"){}).print("侧输出流-------->");

        env.execute();

    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/612488.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号