需求:
事件流(kafka中):userID,eventTime,eventType,productID
广播流(mysql中):userID,userName,userAge
1.根据广播流中的用户数据将事件流中的数据补全:userID,eventTime,eventType,productID,userName,userAge
2.修改广播流中的数据,新合并后的结果数据实时更新(事件流可以捕捉到广播流数据的变化)
实现方法:
1.flink消费kafka数据,用mapfunction处理数据时直接查询mysql中的数据进行补全,性能差,因为每次新到一条数据都要去mysql现查;
2.将广播流的数据放入redis中,用mapfunction处理数据时从redis中查询数据进行补全,性能还凑合,每次新到一条数据都要去redis中查;
3.在flink中实现双流join,但是如果对mysql中的数据进行更新了,该流(mysql所在的流)需要及时更新数据,效率差;
4.采用双流connect+broadCastState(广播流state),广播流会实时从mysql中读取最新数据,放入broadCastState中,事件流从broadCastState获取广播流中的数据,效率高。
核心代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"zcx4:9092,zcx5:9092,zcx6:9092");
FlinkKafkaConsumer kafka = new FlinkKafkaConsumer("zcx1", new SimpleStringSchema(), properties);
DataStreamSource kafkadataStreamSource = env.addSource(kafka);
SingleOutputStreamOperator kafkaDs = kafkadataStreamSource.process(new ProcessFunction() {
@Override
public void processElement(String s, ProcessFunction.Context context, Collector collector) throws Exception {
JSonObject jsonObject = JSON.parseObject(s);
String userID = jsonObject.getString("userID");
String eventTime = jsonObject.getString("eventTime");
String eventType = jsonObject.getString("eventType");
int productID = jsonObject.getIntValue("productID");
collector.collect(new Operation(userID, eventTime, eventType, productID));
}
});
DataStreamSource
测试
1.启动flink程序后,往kafka中写入数据:
{"userID": "user_3", "eventTime": "2022-02-01 12:19:47", "eventType": "browse", "productID": 1}
可以从console看到:
UserOperation{userID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=30}
双流合并成功。
2.修改mysql中的数据,将user_3的userAge=100,再次向kafka中写入上述数据,可以看到:
UserOperation{userID='user_3', eventTime='2022-02-01 12:19:47', eventType='browse', productID=1, userName='user_name3', userAge=100}
合并后的数据中 涉及到原来广播流的数据是最新的。