栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink--广播状态

Flink--广播状态

public class BroadcastStateDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //维度流
        //c001,图书,INSERT
        //c002,电脑,INSERT
        //c002,文具,UPDATE
        //c002,文具,DELETE
        DataStreamSource lines1 = env.socketTextStream("linux01", 7777);
        //事实流
        //o001,1000,c001
        //o002,2000,c002
        DataStreamSource lines2 = env.socketTextStream("linux01", 8888);
        //期望得到的数据
        //o001,1000,c001,图书

        //对维度流进行处理
        SingleOutputStreamOperator> tpStream1 = lines1.map(new MapFunction>() {
            @Override
            public Tuple3 map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(fields[0], fields[1], fields[2]);
            }
        });

        //将维度流数据以MapState的形式广播到下游
        MapStateDescriptor stateDescriptor = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.STRING);
        BroadcastStream> broadcastStream = tpStream1.broadcast(stateDescriptor);

        //对事实流进行处理
        SingleOutputStreamOperator> tpStream2 = lines2.map(new MapFunction>() {
            @Override
            public Tuple3 map(String input) throws Exception {
                String[] fields = input.split(",");
                return Tuple3.of(fields[0], Double.parseDouble(fields[1]), fields[2]);
            }
        });

        //将两个流connect 可以共享状态
        SingleOutputStreamOperator> result = tpStream2.connect(broadcastStream).process(new BroadcastProcessFunction, Tuple3, Tuple4>() {
            //处理事实流
            @Override
            public void processElement(Tuple3 input, ReadonlyContext ctx, Collector> out) throws Exception {
                //获取状态
                ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(stateDescriptor);
                out.collect(Tuple4.of(input.f0, input.f1, input.f2, broadcastState.get(input.f2)));
            }

            //处理广播流
            @Override
            public void processBroadcastElement(Tuple3 input, Context ctx, Collector> out) throws Exception {
                //获取状态
                BroadcastState broadcastState = ctx.getBroadcastState(stateDescriptor);
                String cid = input.f0;
                String cname = input.f1;
                String type = input.f2;
                if ("DELETE".equals(type)) {
                    //删除状态中对应的数据
                    broadcastState.remove(cid);
                } else {
                    //添加,更新状态中的数据
                    broadcastState.put(cid, cname);
                }
            }
        });

        result.print();

        env.execute();


    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/745151.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号