栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink BroadCastState实现事件流、广播流 双流connect(java版本)

Flink BroadCastState实现事件流、广播流 双流connect(java版本)

需求:

事件流(kafka中):userID,eventTime,eventType,productID

广播流(mysql中):userID,userName,userAge

1.根据广播流中的用户数据将事件流中的数据补全:userID,eventTime,eventType,productID,userName,userAge

2.修改广播流中的数据,新合并后的结果数据实时更新(事件流可以捕捉到广播流数据的变化)


实现方法:

1.flink消费kafka数据,用mapfunction处理数据时直接查询mysql中的数据进行补全,性能差,因为每次新到一条数据都要去mysql现查;

2.将广播流的数据放入redis中,用mapfunction处理数据时从redis中查询数据进行补全,性能还凑合,每次新到一条数据都要去redis中查;

3.在flink中实现双流join,但是如果对mysql中的数据进行更新了,该流(mysql所在的流)需要及时更新数据,效率差;

4.采用双流connect+broadCastState(广播流state),广播流会实时从mysql中读取最新数据,放入broadCastState中,事件流从broadCastState获取广播流中的数据,效率高。

核心代码

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties=new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"zcx4:9092,zcx5:9092,zcx6:9092");
        FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("zcx1", new SimpleStringSchema(), properties);

        DataStreamSource kafkadataStreamSource = env.addSource(kafka);
        SingleOutputStreamOperator kafkaDs = kafkadataStreamSource.process(new ProcessFunction() {

            @Override
            public void processElement(String s, ProcessFunction.Context context, Collector collector) throws Exception {
                JSonObject jsonObject = JSON.parseObject(s);
                String userID = jsonObject.getString("userID");
                String eventTime = jsonObject.getString("eventTime");
                String eventType = jsonObject.getString("eventType");
                int productID = jsonObject.getIntValue("productID");
                collector.collect(new Operation(userID, eventTime, eventType, productID));

            }
        });

        DataStreamSource> mysqlDs = env.addSource(new RichSourceFunction>() {
            boolean isRunning = true;
            Connection connection = null;
            PreparedStatement preparedStatement = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                Class.forName("com.mysql.jdbc.Driver");
                connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "root");
                preparedStatement = connection.prepareStatement("select * from user_info");
            }

            @Override
            public void close() throws Exception {
                if (null != preparedStatement) {
                    preparedStatement.close();
                }
                if (null != connection) {
                    connection.close();
                }
            }

            @Override
            public void run(SourceContext> sourceContext) throws Exception {
                while (isRunning) {
                    ResultSet resultSet = preparedStatement.executeQuery();
                    while (resultSet.next()) {
                        HashMap hashMap=new HashMap<>();
                        hashMap.put(resultSet.getString("userID"),
                                new UserInfo(resultSet.getString("userName"),resultSet.getInt("userAge")));
                        sourceContext.collect(hashMap);
                    }
                    Thread.sleep(10000);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

//        注意:broadcaststate是MapStateDescriptor,map类型,所以需要將mysqlsource的数据处理成map类型
        BroadcastStream> broadcastStream = mysqlDs.broadcast(new MapStateDescriptor("broadcastState", String.class, UserInfo.class));

        //双流connect
        BroadcastConnectedStream connect = kafkaDs.connect(broadcastStream);
        connect.process(new BroadcastProcessFunction,UserOperation>() {
            MapStateDescriptor broadCastStateDescriptor= new MapStateDescriptor("broadcastState", String.class, UserInfo.class);
            @Override
            //从broadcastState获取广播流中的数据
            public void processElement(Operation operation, BroadcastProcessFunction, UserOperation>.ReadonlyContext readOnlyContext, Collector collector) throws Exception {
                ReadOnlyBroadcastState broadcastState = readOnlyContext.getBroadcastState(broadCastStateDescriptor);
                if(broadcastState.contains(operation.userID)){
                    UserInfo userInfo = broadcastState.get(operation.userID);
                    collector.collect(new UserOperation(operation.userID,operation.eventTime,operation.eventType,operation.productID,userInfo.userName,userInfo.userAge));
                }
            }

            @Override
            //将广播流中数据放入broadcastState
            public void processBroadcastElement(Map stringUserInfoMap, BroadcastProcessFunction, UserOperation>.Context context, Collector collector) throws Exception {
                BroadcastState broadcastState = context.getBroadcastState(broadCastStateDescriptor);
                Iterator iterator = stringUserInfoMap.keySet().iterator();
                if(iterator.hasNext()){
                    String next = iterator.next();
                    broadcastState.put(next,stringUserInfoMap.get(next));
                }
            }
        }).print();

        env.execute();

测试

1.启动flink程序后,往kafka中写入数据:

{"userID": "user_3", "eventTime": "2022-02-01 12:19:47", "eventType": "browse", "productID": 1}

可以从console看到:

UserOperation{userID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=30}

双流合并成功。

2.修改mysql中的数据,将user_3的userAge=100,再次向kafka中写入上述数据,可以看到:

UserOperation{userID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=100}

合并后的数据中 涉及到原来广播流的数据是最新的。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730322.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号