栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink测流输出

Flink测流输出

测流输出示例
public class SideOutputDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource lines = env.socketTextStream("linux01", 7777);
        //需指定泛型的类型  或者new OutputTag的子类(即使用匿名内部类)
        //OutputTag oddTag = new OutputTag("odd-data") {};
        OutputTag oddTag = new OutputTag<>("odd-data", Types.INT);
        OutputTag evenTag = new OutputTag<>("even-data",Types.INT);
        OutputTag strTag = new OutputTag<>("str-data",Types.STRING);

        SingleOutputStreamOperator mainStream = lines.process(new ProcessFunction() {
            @Override
            public void processElement(String line, Context ctx, Collector out) throws Exception {
                try {
                    int num = Integer.parseInt(line);
                    if (num % 2 == 0) {
                        //偶数
                        ctx.output(oddTag, num);
                    } else {
                        //奇数
                        ctx.output(evenTag, num);
                    }
                } catch (NumberFormatException e) {
                    //字符串
                    ctx.output(strTag, line);
                }
            }
        });

        DataStream oddStream = mainStream.getSideOutput(oddTag);
        DataStream strStream = mainStream.getSideOutput(strTag);

        oddStream.print("偶数流");
        strStream.print("字符流");

        mainStream.print("主流");
        env.execute();
    }
}
使用侧流输出获取窗口中迟到的数据
public class GetWindowLateDataDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //开启checkpointing
        env.enableCheckpointing(10000);
        //设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000));

        DataStreamSource lines = env.socketTextStream("linux01", 7777);
        //获取Watermark
        SingleOutputStreamOperator linesWithWatermark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {
            @Override
            public long extractTimestamp(String element, long recordTimestamp) {
                return Long.parseLong(element.split(",")[0]);
            }
        }));

        SingleOutputStreamOperator> wordAndCount = linesWithWatermark.map(new MapFunction>() {
            @Override
            public Tuple2 map(String line) throws Exception {
                if (line.startsWith("error")){
                    throw new RuntimeException("错误数据");
                }
                String[] fields = line.split(",");
                return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
            }
        });
        //按照单词keyBy
        KeyedStream, String> keyedStream = wordAndCount.keyBy(tp -> tp.f0);
        //划分窗口
        WindowedStream, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
        //给迟到的数据打上标签
        OutputTag> lateDataTag = new OutputTag>("late-data"){};
        windowedStream.sideOutputLateData(lateDataTag);
        SingleOutputStreamOperator> result = windowedStream.sum(1);
        //获取迟到的数据
        DataStream> lateDataStream = result.getSideOutput(lateDataTag);

        result.print();
        lateDataStream.print("迟到的数据");
        env.execute();


    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745587.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号