栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

FlinkTable&Sql 中 OutputSelector升级到OutputTag 对数据分流(包含对比实例)

FlinkTable&Sql 中 OutputSelector升级到OutputTag 对数据分流(包含对比实例)

文章目录
  • 案例说明
  • flink 1.10 版本 的outputSelector 实现
  • flink 1.12 版本 OutputTag 实现

案例说明

利用flink source 功能实现一个自定义的实时数据源。
达到的效果是:将实时的商品数据进行分流,分成even 和 odd 两个流进行join。条件是名称相同,最后把join 的结果输出。

flink 1.10 版本 的outputSelector 实现
public class MyStreamingSource implements SourceFunction {

    private boolean isRunning = true;

    
    public void run(SourceContext ctx) throws Exception {
        while(isRunning){
            Item item = generateItem();
            ctx.collect(item);

            //每秒产生一条数据
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        isRunning = false;
    }

    //随机产生一条商品数据
    private Item generateItem(){
        int i = new Random().nextInt(100);
        ArrayList list = new ArrayList();
        list.add("HAT");
        list.add("TIE");
        list.add("SHOE");
        Item item = new Item();
        item.setName(list.get(new Random().nextInt(3)));
        item.setId(i);
        return item;
    }
}


class StreamingDemo {
    
    public static void main(String[] args) throws Exception {

        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        SingleOutputStreamOperator source = bsEnv.addSource(new MyStreamingSource()).map(new MapFunction() {
            @Override
            public Item map(Item item) throws Exception {
                return item;
            }
        });

        DataStream evenSelect = source.split(new OutputSelector() {
            @Override
            public Iterable select(Item value) {
                List output = new ArrayList<>();
                if (value.getId() % 2 == 0) {
                    output.add("even");
                } else {
                    output.add("odd");
                }
                return output;
            }
        }).select("even");

        DataStream oddSelect = source.split(new OutputSelector() {
            @Override
            public Iterable select(Item value) {
                List output = new ArrayList<>();
                if (value.getId() % 2 == 0) {
                    output.add("even");
                } else {
                    output.add("odd");
                }
                return output;
            }
        }).select("odd");


        bsTableEnv.createTemporaryView("evenTable", evenSelect, "name,id");
        bsTableEnv.createTemporaryView("oddTable", oddSelect, "name,id");

        Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");

        queryTable.printSchema();

        bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint>(){})).print();

        bsEnv.execute("streaming sql job");
    }

}
flink 1.12 版本 OutputTag 实现
public class MyStreamingSource implements SourceFunction {

    private boolean isRunning = true;

    
    public void run(SourceContext ctx) throws Exception {
        while(isRunning){
            Item item = generateItem();
//            System.out.println(item);
            ctx.collect(item);

            //每秒产生一条数据
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        isRunning = false;
    }

    //随机产生一条商品数据
    private Item generateItem(){
        int i = new Random().nextInt(100);
        ArrayList list = new ArrayList();
        list.add("HAT");
        list.add("TIE");
        list.add("SHOE");
        Item item = new Item();
        item.setName(list.get(new Random().nextInt(3)));
        item.setId(i);
        return item;
    }
}


class StreamingDemo {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
        //首先定义两个sideoutput来保存切分出来的数据
        OutputTag outputTag1 = new OutputTag("even") {};//保存偶数
        OutputTag outputTag2 = new OutputTag("odd") {};//保存奇数
        SingleOutputStreamOperator source = bsEnv.addSource(new MyStreamingSource()).map(new MapFunction() {
            @Override
            public Item map(Item item) throws Exception {
                return item;
            }
        });

        SingleOutputStreamOperator result= source.process(new ProcessFunction() {
            @Override
            public void processElement(Item value, Context ctx, Collector out) throws Exception {
                if (value.getId() % 2 ==  0) {
                    ctx.output(outputTag1, value);
                } else {
                    ctx.output(outputTag2, value);
                }
            }
        });
        DataStream so1 = result.getSideOutput(outputTag1);
        DataStream so2 = result.getSideOutput(outputTag2);
        bsTableEnv.createTemporaryView("evenTable", so1);
        bsTableEnv.createTemporaryView("oddTable", so2);
        Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
        bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint>(){})).print();

        bsEnv.execute("streaming sql job");
    }

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673576.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号