public class BroadcastStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//维度流
//c001,图书,INSERT
//c002,电脑,INSERT
//c002,文具,UPDATE
//c002,文具,DELETE
DataStreamSource lines1 = env.socketTextStream("linux01", 7777);
//事实流
//o001,1000,c001
//o002,2000,c002
DataStreamSource lines2 = env.socketTextStream("linux01", 8888);
//期望得到的数据
//o001,1000,c001,图书
//对维度流进行处理
SingleOutputStreamOperator> tpStream1 = lines1.map(new MapFunction>() {
@Override
public Tuple3 map(String input) throws Exception {
String[] fields = input.split(",");
return Tuple3.of(fields[0], fields[1], fields[2]);
}
});
//将维度流数据以MapState的形式广播到下游
MapStateDescriptor stateDescriptor = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.STRING);
BroadcastStream> broadcastStream = tpStream1.broadcast(stateDescriptor);
//对事实流进行处理
SingleOutputStreamOperator> tpStream2 = lines2.map(new MapFunction>() {
@Override
public Tuple3 map(String input) throws Exception {
String[] fields = input.split(",");
return Tuple3.of(fields[0], Double.parseDouble(fields[1]), fields[2]);
}
});
//将两个流connect 可以共享状态
SingleOutputStreamOperator> result = tpStream2.connect(broadcastStream).process(new BroadcastProcessFunction, Tuple3, Tuple4>() {
//处理事实流
@Override
public void processElement(Tuple3 input, ReadonlyContext ctx, Collector> out) throws Exception {
//获取状态
ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(stateDescriptor);
out.collect(Tuple4.of(input.f0, input.f1, input.f2, broadcastState.get(input.f2)));
}
//处理广播流
@Override
public void processBroadcastElement(Tuple3 input, Context ctx, Collector> out) throws Exception {
//获取状态
BroadcastState broadcastState = ctx.getBroadcastState(stateDescriptor);
String cid = input.f0;
String cname = input.f1;
String type = input.f2;
if ("DELETE".equals(type)) {
//删除状态中对应的数据
broadcastState.remove(cid);
} else {
//添加,更新状态中的数据
broadcastState.put(cid, cname);
}
}
});
result.print();
env.execute();
}
}