package com.cn.stream.joins;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Properties;
public class cdjoin {
private static OutputTag> first = new OutputTag<>("first", new TypeHint>() {
}.getTypeInfo());
private static OutputTag> second = new OutputTag<>("second", new TypeHint>() {
}.getTypeInfo());
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "tesst");
FlinkKafkaConsumer myConsumer =
new FlinkKafkaConsumer<>("kafkaS", new SimpleStringSchema(), properties);
DataStreamSource stringDataStreamSource = env.addSource(myConsumer);
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "tesst");
FlinkKafkaConsumer myConsumers =
new FlinkKafkaConsumer<>("kafkaSS", new SimpleStringSchema(), properties);
DataStreamSource stringDataStreamSource1 = env.addSource(myConsumers);
KeyedStream, String> tuple3StringKeyedStream = stringDataStreamSource.map(new MapFunction>() {
@Override
public Tuple3 map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3(split[0], split[1], split[2]);
}
}).keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple3 value) throws Exception {
return value.f2;
}
});
KeyedStream, String> tuple3StringKeyedStream1 = stringDataStreamSource1.map(new MapFunction>() {
@Override
public Tuple3 map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3(split[0], split[1], split[2]);
}
}).keyBy(new KeySelector, String>() {
@Override
public String getKey(Tuple3 value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator> process = tuple3StringKeyedStream.connect(tuple3StringKeyedStream1).process(new CoProcessFunction, Tuple3, Tuple6>() {
ValueState> stateFirst = null;
ValueState> stateSecond = null;
ValueState Timestate = null;
@Override
public void processElement1(Tuple3 value, CoProcessFunction, Tuple3, Tuple6>.Context ctx, Collector> out) throws Exception {
Tuple3 value2 = stateSecond.value();
//判断第二个数据到来了么 如果到了就直接和 value 添加 一并输出
if (value2 != null) {
// 关联上了 发送数据
out.collect(new Tuple6(value.f0, value.f1, value.f2, value2.f0, value2.f1, value2.f2));
// 清空 第二个流的数据 保证状态清除 不会造成内存溢出
stateSecond.clear();
// 关联上了 删除定时器 防止错误发送 测流 不删除 到时间会 发送到测流 但是 数据已经 关联
ctx.timerService().deleteProcessingTimeTimer(Timestate.value());
// 清除 注册时间的状态 也是防止内存 一直保存
Timestate.clear();
} else {
//如果没到 将 此值保存到状态中 保存等待其他 流调用
stateFirst.update(value);
//获得 当前的处理时间 也就是 系统时间 在此之上 添加 一个 需要多久才能触发 ontimer 的时间 单位是毫秒
long l = ctx.timerService().currentProcessingTime();
//我这里 时间 用 一个Long 类型的状态去保存 目的是 如果 流关联到到了 及时将这个触发器删除
Timestate.update(l + 20000l);
// 注册触发器
ctx.timerService().registerProcessingTimeTimer(Timestate.value());
}
}
@Override
public void processElement2(Tuple3 value, CoProcessFunction, Tuple3, Tuple6>.Context ctx, Collector> out) throws Exception {
Tuple3 value1 = stateFirst.value();
if (value1 != null) {
out.collect(new Tuple6(value1.f0, value1.f1, value1.f2, value.f0, value.f1, value.f2));
stateFirst.clear();
ctx.timerService().deleteProcessingTimeTimer(Timestate.value());
Timestate.clear();
} else {
stateSecond.update(value);
long l = ctx.timerService().currentProcessingTime();
Timestate.update(l + 20000l);
ctx.timerService().registerProcessingTimeTimer(Timestate.value());
}
}
@Override
public void onTimer(long timestamp, CoProcessFunction, Tuple3, Tuple6>.onTimerContext ctx, Collector> out) throws Exception {
Tuple3 value1 = stateFirst.value();
Tuple3 value2 = stateSecond.value();
if (value1 != null) {
ctx.output(first, value1);
}
if (value2 != null) {
ctx.output(second, value2);
}
stateFirst.clear();
stateSecond.clear();
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor> firstStream = new ValueStateDescriptor<>("firstStream", new TypeHint>() {
}.getTypeInfo());
ValueStateDescriptor> secondStream = new ValueStateDescriptor<>("secondStream", new TypeHint>() {
}.getTypeInfo());
ValueStateDescriptor timeState = new ValueStateDescriptor<>("timeState", Long.class);
stateFirst = getRuntimeContext().getState(firstStream);
stateSecond = getRuntimeContext().getState(secondStream);
Timestate = getRuntimeContext().getState(timeState);
}
@Override
public void close() throws Exception {
stateFirst.clear();
stateSecond.clear();
}
});
process.print();
process.getSideOutput(first).print("first");
process.getSideOutput(second).print("second");
env.execute("Stream join");
}
}