官网 :https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/side_output/
1.1 理解侧输出流就是将问题数据或者 不符合条件的数据进行输出到数据库中或者打印出来, 就形成一个正确的流和一个不符合条件的流。1.2 实例
案例: 判断 输入的字符串是否等于 "big" 如果等于big 就输入到 主输出流 否则将抛出异常
package com.wudl.flink.stream;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import static org.apache.hadoop.yarn.webapp.hamlet.HamletSpec.Media.print;
public class Flink_SideOutput {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource dsStream = env.socketTextStream("192.168.1.130", 9999);
OutputTag outputTag = new OutputTag("number") {
};
SingleOutputStreamOperator process = dsStream.process(new ProcessFunction() {
@Override
public void processElement(String s, Context ctx, Collector collector) throws Exception {
// 判断 输入的字符串是否等于 "big" 如果等于big 就输入到 主输出流 否则将抛出异常
try {
if (s.equals("big")) {
collector.collect(s);
}else
{
throw new Exception();
}
}catch (Exception e)
{
// e.printStackTrace();
ctx.output(outputTag,s);
}
}
});
// 打印主流
process.print("打印主流 ----->");
// 打印错误的的流或者是 不符合条件的流
process.getSideOutput(outputTag).print("侧输出流----->");
env.execute();
}
}
1.3结果
3. Flink 使用侧输出流把一个流拆成多个流
Flink 12 以后官方建议我们使用多侧输入流来输出
3.1实例:根据输入的数据 进行条件判断然后输入到不同的侧输入流中。
package com.wudl.flink.stream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class Flink_SideOutputTwo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource socketTextStream = env.socketTextStream("192.168.1.130", 9999);
OutputTag outputTag6 = new OutputTag("number6-10") {
};
OutputTag outputTag10 = new OutputTag("number10") {
};
SingleOutputStreamOperator process = socketTextStream.process(new ProcessFunction() {
@Override
public void processElement(String s, Context cxt, Collector out) throws Exception {
int number = Integer.parseInt(s);
if (number < 5) {
out.collect(number);
} else if (number > 5 && number < 10) {
cxt.output(outputTag6, number);
} else {
cxt.output(outputTag10, number);
}
}
});
process.print(" 主流-------->");
process.getSideOutput(outputTag6).print("侧输入流-- 大于5 小于10 的数据---");
process.getSideOutput(outputTag10).print("侧输入流-- 大于10 的数据---");
env.execute();
}
}
3.2 结果
********************************************************** 应用的实例***********************************************************************
案例: 通过开窗将迟到的数据输出到侧输入流中 ,
package com.wudl.flink.stream;
import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
public class Flink_SideOutput_Examples {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource stringDataStreamSource = env.socketTextStream("192.168.1.130", 9999);
SingleOutputStreamOperator map = stringDataStreamSource.map(new MapFunction() {
@Override
public WaterSensor map(String s) throws Exception {
String[] split = s.split(",");
return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.parseInt(split[2]));
}
});
// 创建产生水印策略
WatermarkStrategy wms = WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs() * 1000;
}
});
SingleOutputStreamOperator process = map.assignTimestampsAndWatermarks(wms).keyBy(WaterSensor::getId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(new OutputTag("side_1") {
})
.process(new ProcessWindowFunction() {
@Override
public void process(String key, Context context, Iterable elements, Collector out) throws Exception {
String msg = "当前key: " + key + " 窗口: [" + context.window().getStart() / 1000 + "," + context.window().getEnd() / 1000 + ") 一共有 "
+ elements.spliterator().estimateSize() + "条数据" +
"watermark: " + context.currentWatermark();
out.collect(context.window().toString());
out.collect(msg);
}
});
process.print("主输出流---->");
process.getSideOutput(new OutputTag("side_1"){}).print("侧输出流-------->");
env.execute();
}
}



