Join
public class EventTumblingWindowJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1000,o001,c001
DataStreamSource lines1 = env.socketTextStream("linux01", 7777);
//1200,c001,图书
DataStreamSource lines2 = env.socketTextStream("linux01", 8888);
//按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API
//提取两个流的Watermark
SingleOutputStreamOperator lines1WithWatermark
= lines1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
}));
SingleOutputStreamOperator lines2WithWatermark
= lines2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
}));
//对两个流进行处理
SingleOutputStreamOperator> tpStream1
= lines1WithWatermark.map(new MapFunction>() {
@Override
public Tuple3 map(String input) throws Exception {
String[] fields = input.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
});
SingleOutputStreamOperator> tpStream2
= lines2WithWatermark.map(new MapFunction>() {
@Override
public Tuple3 map(String input) throws Exception {
String[] fields = input.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
});
//将两个流join
DataStream> result = tpStream1.join(tpStream2)
.where(tp1 -> tp1.f2) //第一个流keyBY的字段
.equalTo(tp2 -> tp2.f1) //第二个流keyBy的字段
.window(TumblingEventTimeWindows.of(Time.seconds(5))) //划分窗口
//全量聚合的处理逻辑
.apply(new JoinFunction, Tuple3, Tuple5>() {
//窗口触发后,条件相同的,并且在同一个窗口内的数据,会传入到join方法中
@Override
public Tuple5 join(Tuple3 first, Tuple3 second) throws Exception {
return Tuple5.of(first.f0,first.f1,first.f2,second.f0,second.f2);
}
});
result.print();
env.execute();
}
}
LeftOuterJoin
public class EventTumblingWindowLeftOuterJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1000,o001,c001
DataStreamSource lines1 = env.socketTextStream("linux01", 7777);
//1200,c001,图书
DataStreamSource lines2 = env.socketTextStream("linux01", 8888);
//按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API
//提取两个流的Watermark
SingleOutputStreamOperator lines1WithWatermark
= lines1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
}));
SingleOutputStreamOperator lines2WithWatermark
= lines2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
}));
//对两个流进行处理
SingleOutputStreamOperator> tpStream1
= lines1WithWatermark.map(new MapFunction>() {
@Override
public Tuple3 map(String input) throws Exception {
String[] fields = input.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
});
SingleOutputStreamOperator> tpStream2
= lines2WithWatermark.map(new MapFunction>() {
@Override
public Tuple3 map(String input) throws Exception {
String[] fields = input.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
});
//将两个流leftOuterJoin
DataStream> result = tpStream1.coGroup(tpStream2)
.where(tp1 -> tp1.f2) //第一个流keyBy的字段
.equalTo(tp2 -> tp2.f1)//第二个流keyBy的字段
.window(TumblingEventTimeWindows.of(Time.seconds(5)))//划分窗口
.apply(new CoGroupFunction, Tuple3, Tuple5>() {
@Override
public void coGroup(Iterable> first, Iterable> second, Collector> out) throws Exception {
for (Tuple3 left : first) {
//实现左外连接
//先循环左流的数据
boolean isEmpty = false;
for (Tuple3 right : second) {
isEmpty = true;
out.collect(Tuple5.of(left.f0, left.f1, left.f2, right.f0, right.f2));
}
if (!isEmpty) {
out.collect(Tuple5.of(left.f0, left.f1, left.f2, null, null));
}
}
}
});
result.print();
env.execute();
}
}
intervalJoin
public class EventTumblingWindowIntervalJoin {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1000,o001,c001
DataStreamSource lines1 = env.socketTextStream("linux01", 7777);
//1200,c001,图书
DataStreamSource lines2 = env.socketTextStream("linux01", 8888);
//按照EventTime进行join,窗口长度为5000秒,使用新的提取EventTime生成WaterMark的API
//提取两个流的Watermark
SingleOutputStreamOperator lines1WithWatermark
= lines1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
}));
SingleOutputStreamOperator lines2WithWatermark
= lines2.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
}));
//对两个流进行处理
SingleOutputStreamOperator> tpStream1
= lines1WithWatermark.map(new MapFunction>() {
@Override
public Tuple3 map(String input) throws Exception {
String[] fields = input.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
});
SingleOutputStreamOperator> tpStream2
= lines2WithWatermark.map(new MapFunction>() {
@Override
public Tuple3 map(String input) throws Exception {
String[] fields = input.split(",");
return Tuple3.of(Long.parseLong(fields[0]), fields[1], fields[2]);
}
});
//将两个流join
KeyedStream, String> keyedStream1 = tpStream1.keyBy(tp -> tp.f2);
KeyedStream, String> keyedStream2 = tpStream2.keyBy(tp -> tp.f1);
SingleOutputStreamOperator> result = keyedStream1.intervalJoin(keyedStream2)
.between(Time.seconds(-1), Time.seconds(1)) //指定的时间范围
.upperBoundExclusive() //不包括上界
.process(new ProcessJoinFunction, Tuple3, Tuple5>() {
@Override
public void processElement(Tuple3 left, Tuple3 right, Context ctx, Collector> out) throws Exception {
out.collect(Tuple5.of(left.f0,left.f1,left.f2,right.f0,right.f2));
}
});
result.print();
env.execute();
}
}



