栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

flink java旁路输出(Side Output),对原始流进行分流、复制

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

flink java旁路输出(Side Output),对原始流进行分流、复制

flink通过ProcessFunction来分流,可以将一份流进行拆分、复制等操作,比如下面的代码通过读取一个基本的文本流,将流分别做处理后进行输出:

案例代码
package wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;


public class manyOutWordCount {

    public static void main(String[] args) throws Exception {
        // 1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2.从文件中读取数据
        DataStream dataStream = env.readTextFile("src/main/resources/hello.txt");
        // 执行环境并行度设置3
        env.setParallelism(3);
        // 3.按照空格分词,流的类型是new Tuple2<>(wordLine, 1)
        DataStream> sensorStream = dataStream.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] wordString = value.split(" ");
                for (String wordLine : wordString) {
                    out.collect(new Tuple2<>(wordLine, 1));
                }
            }
        });

        //旁路输出,拆分流
        final OutputTag> sideStream = new OutputTag>("te") {
        };
        SingleOutputStreamOperator> mainDataStream = sensorStream.process(new ProcessFunction, Tuple2>() {
            @Override
            public void processElement(Tuple2 value, ProcessFunction, Tuple2>.Context ctx, Collector> out) throws Exception {
                out.collect(new Tuple2<>(value.f0, 2)); // 这里把 mainDataStream 的输出变为 Tuple(单词,2)
                ctx.output(sideStream, value); // 这里把 sideStream 的输出变为 Tuple(单词,1)
            }
        });
        DataStream> sideOutput = mainDataStream.getSideOutput(sideStream);//获取sideOutput的数据

        sideOutput.print();
        mainDataStream.print();

        //执行
        env.execute();
    }
}

其中数据hello.txt的文件内容是:

hello world
hello flink
hello spark
When we have shuffled off this mortal coil
When we have shuffled off this mortal coil
ack
hello world
hello flink
hello spark
When we have shuffled off this mortal coil
When we have shuffled off this mortal coil
ack
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/294395.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号