测流输出示例
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource lines = env.socketTextStream("linux01", 7777);
//需指定泛型的类型 或者new OutputTag的子类(即使用匿名内部类)
//OutputTag oddTag = new OutputTag("odd-data") {};
OutputTag oddTag = new OutputTag<>("odd-data", Types.INT);
OutputTag evenTag = new OutputTag<>("even-data",Types.INT);
OutputTag strTag = new OutputTag<>("str-data",Types.STRING);
SingleOutputStreamOperator mainStream = lines.process(new ProcessFunction() {
@Override
public void processElement(String line, Context ctx, Collector out) throws Exception {
try {
int num = Integer.parseInt(line);
if (num % 2 == 0) {
//偶数
ctx.output(oddTag, num);
} else {
//奇数
ctx.output(evenTag, num);
}
} catch (NumberFormatException e) {
//字符串
ctx.output(strTag, line);
}
}
});
DataStream oddStream = mainStream.getSideOutput(oddTag);
DataStream strStream = mainStream.getSideOutput(strTag);
oddStream.print("偶数流");
strStream.print("字符流");
mainStream.print("主流");
env.execute();
}
}
使用侧流输出获取窗口中迟到的数据
public class GetWindowLateDataDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpointing
env.enableCheckpointing(10000);
//设置重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 5000));
DataStreamSource lines = env.socketTextStream("linux01", 7777);
//获取Watermark
SingleOutputStreamOperator linesWithWatermark = lines.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
return Long.parseLong(element.split(",")[0]);
}
}));
SingleOutputStreamOperator> wordAndCount = linesWithWatermark.map(new MapFunction>() {
@Override
public Tuple2 map(String line) throws Exception {
if (line.startsWith("error")){
throw new RuntimeException("错误数据");
}
String[] fields = line.split(",");
return Tuple2.of(fields[1], Integer.parseInt(fields[2]));
}
});
//按照单词keyBy
KeyedStream, String> keyedStream = wordAndCount.keyBy(tp -> tp.f0);
//划分窗口
WindowedStream, String, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)));
//给迟到的数据打上标签
OutputTag> lateDataTag = new OutputTag>("late-data"){};
windowedStream.sideOutputLateData(lateDataTag);
SingleOutputStreamOperator> result = windowedStream.sum(1);
//获取迟到的数据
DataStream> lateDataStream = result.getSideOutput(lateDataTag);
result.print();
lateDataStream.print("迟到的数据");
env.execute();
}
}