使用心法对于当前处理流中的部分或者全部数据单独输出到另外的流中
作用:
数据选择
对于当前处理的流可以进行自定义选择然后输出到独立的侧流中
异常数据保留
对于异常数据,输出到独立的侧流中,进行单独处理。如窗口中过期的延迟数据保留。
可在如下算子中使用(别的方法貌似没有 Context 对象提供):
ProcessFunctionKeyedProcessFunctionCoProcessFunctionKeyedCoProcessFunctionProcessWindowFunctionProcessAllWindowFunction
process方法提供了context 对象, 其output(OutputTag outputTag, X value);方法可收集需要输出到侧流的数据,并使用该streamdateData.getSideOutput(outputTag);方法可获取到该侧流数据。
DataStream数据选择应用(分流)input = ...; //定义输出标签 final OutputTag outputTag = new OutputTag ("side-output"){}; SingleOutputStreamOperator mainDataStream = input .process(new ProcessFunction () { @Override public void processElement( Integer value, Context ctx, Collector out) throws Exception { // 发送数据到主要的输出 out.collect(value); // 发送数据到旁路输出 ctx.output(outputTag, "sideout-" + String.valueOf(value)); } }); //获取上面收集的侧流 DataStream sideOutputStream = mainDataStream.getSideOutput(outputTag);
可以将当前处理流中的数据输出到多个侧流中
e.g:
将数据源中格式化失败的数据输出到 formatExceptionData 侧流中;将窗口中为偶数的数据输出到dayle_data2 流中。
public class TransformOperator {
//定义侧流标签
private static OutputTag> outputTag = new OutputTag>("dayle_data2"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource> dataStreams = env
.addSource(new SourceFunction>() {
Random random = new Random();
int tp2 = 0;
int speed = 0;
int f1 = -1;
@Override
public void run(SourceContext> ctx) throws Exception {
while (true) {
TimeUnit.SECONDS.sleep(1);
tp2 = Math.abs(random.nextInt() % 7);
f1 = Math.abs(++speed + tp2);
ctx.collect(Tuple2.of("a", f1));
System.out.println("source generator :t" + f1);
}
}
@Override
public void cancel() {
}
});
//延迟数据触发边界测试
//必须为匿名内部类
OutputTag erroOutputTag = new OutputTag("formatExceptionData"){} ;
SingleOutputStreamOperator> socketTextStream = env
.socketTextStream("10.164.29.143", 10086)
.process(new FlatMapProcess(erroOutputTag));
SingleOutputStreamOperator> tuple2Stream = ReduceWindowPrint(socketTextStream, 5);
//获取测流
socketTextStream.getSideOutput(erroOutputTag).print();
tuple2Stream.getSideOutput(outputTag).print();
//正常处理的主流
tuple2Stream.print();
env.execute("event time process ");
}
private static SingleOutputStreamOperator> ReduceWindowPrint(SingleOutputStreamOperator> dataStreams, int dayleTime) {
return dataStreams
.assignTimestampsAndWatermarks(
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((e, t) -> e.f1*1000)
.withIdleness(Duration.ofSeconds(10))
)
.keyBy(e -> e.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() {
int i;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
i = 0;
}
@Override
public void process(String s, Context context, Iterable> elements, Collector> out) throws Exception {
Iterator> iterator = elements.iterator();
while (iterator.hasNext()) {
Tuple2 next = iterator.next();
out.collect(next);
if (next.f1%2 == 0) {
//收集数据到侧流
context.>output(outputTag,Tuple2.of("side-" + next.f0, next.f1));
}
}
System.out.println("======================= ^^^ window_num:t" + i++ + " ^^^ ==================");
System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )");
System.out.println("======================= key: " + s );
System.out.println("======================= watermark: " + context.currentWatermark());
System.out.println("======================================================");
}
});
}
}
class FlatMapProcess extends ProcessFunction> {
private OutputTag outputTag;
public FlatMapProcess(OutputTag outputTag) {
this.outputTag = outputTag;
}
@Override
public void processElement(String value, Context ctx, Collector> out) throws Exception {
try {
String[] infos = value.split("\s+");
if (infos.length == 2) {
out.collect(Tuple2.of(infos[0], Integer.valueOf(infos[1])));
} else {
ctx.output(outputTag, "format exception format_:t" + value);
}
} catch (NumberFormatException e) {
ctx.output(outputTag, "format exception format_:t" + value);
}
}
}
异常数据保留应用
e.g: 将窗口中的不在窗口范围的异常数据输出到daly_dataStream中
心法
1.定义outputTag
2.设置窗口数据允许延迟时间
3.获取延迟数据
代码:
public class TransformOperator {
//定义侧流标签 必须为匿名内部类
//输入数据为偶数的数据
private static OutputTag> outputTag = new OutputTag>("dayle_data2"){};
//解析错误的数据
private static OutputTag> sideOutput_dayle_data1 = new OutputTag>("sideOutput_dayle_data1"){};
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
//必须为匿名内部类
OutputTag erroOutputTag = new OutputTag("formatExceptionData"){} ;
SingleOutputStreamOperator> socketTextStream = env
.socketTextStream("10.164.29.143", 10086)
//解析输入数据并将异常数据输出到侧流
.process(new FlatMapProcess(erroOutputTag));
SingleOutputStreamOperator> tuple2Stream = ReduceWindowPrint(socketTextStream, 5);
//败获取源数据解析失的侧流
socketTextStream.getSideOutput(erroOutputTag).map((MapFunction) e -> "parseException:t" + e).print();
//获取迟到的数据
tuple2Stream.getSideOutput(sideOutput_dayle_data1).map(new MapFunction, Tuple2>() {
@Override
public Tuple2 map(Tuple2 e) throws Exception {
return Tuple2.of("latedata:t"+e.f0, e.f1);
}
}).print();
//获取值为偶数的输入数据
tuple2Stream.getSideOutput(outputTag).map(new MapFunction, Tuple2>() {
@Override
public Tuple2 map(Tuple2 value) throws Exception {
return Tuple2.of("o_"+value.f0, value.f1);
}
}).print();
//正常处理的主流
tuple2Stream.print();
env.execute("event time process ");
}
private static SingleOutputStreamOperator> ReduceWindowPrint(SingleOutputStreamOperator> dataStreams, int dayleTime) {
return dataStreams
.assignTimestampsAndWatermarks(
WatermarkStrategy
.>forBoundedOutOfOrderness(Duration.ofSeconds(dayleTime))
.withTimestampAssigner((e, t) -> e.f1*1000)
//多并行下如果有窗口数据为空,那么窗口需要barre对齐,不会触发。
// 空窗口将导致下游算子都无法进行计算,
// 设置idleness时间那么如果存在空窗口,当别的窗口有数据并且到达设置的时间(下面为10s)
// 窗口就会触发
.withIdleness(Duration.ofSeconds(10))
)
.keyBy(e -> e.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//允许上个窗口范围内的数据迟到时间
.allowedLateness(Time.seconds(2))
//迟到数据输出到指定侧流
.sideOutputLateData(sideOutput_dayle_data1)
.process(new ProcessWindowFunction, Tuple2, String, TimeWindow>() {
int i;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
i = 0;
}
@Override
public synchronized void process(String s, Context context, Iterable> elements, Collector> out) throws Exception {
Iterator> iterator = elements.iterator();
while (iterator.hasNext()) {
Tuple2 next = iterator.next();
out.collect(next);
if (next.f1%2 == 0) {
//收集数据到侧流
context.>output(outputTag,Tuple2.of("side-" + next.f0, next.f1));
}
}
System.out.println("======================= ^^^ window_num:t" + i++ + " ^^^ ==================");
System.out.println("======================= range: [" + context.window().getStart() + "___" + context.window().getEnd() + " )");
System.out.println("======================= key: " + s );
System.out.println("======================= watermark: " + context.currentWatermark());
System.out.println("======================================================");
}
});
}
}
class FlatMapProcess extends ProcessFunction> {
private OutputTag outputTag;
public FlatMapProcess(OutputTag outputTag) {
this.outputTag = outputTag;
}
@Override
public void processElement(String value, Context ctx, Collector> out) throws Exception {
try {
String[] infos = value.split("\s+");
if (infos.length == 2) {
out.collect(Tuple2.of(infos[0], Integer.valueOf(infos[1])));
} else {
ctx.output(outputTag, "format exception format_:t" + value);
}
} catch (NumberFormatException e) {
//异常数据收集到outputTag指定的侧流
ctx.output(outputTag, "format exception format_:t" + value);
}
}
}
输入
a a 0 a 1 a 10 a 15 a 16 a 10 a 9 a 8 a 1 a 12 a 1 a 15 a 1 a 17 a 1
输出
2> parseException: format exception format_: a 1> parseException: format exception format_: 2> (a,0) 2> (o_side-a,0) 2> (a,1) ======================= ^^^ window_num: 0 ^^^ ================== ======================= range: [0___10000 ) ======================= key: a ======================= watermark: 9999 ====================================================== 2> (a,0) 2> (o_side-a,0) 2> (a,1) 2> (a,9) ======================= ^^^ window_num: 1 ^^^ ================== ======================= range: [0___10000 ) ======================= key: a ======================= watermark: 10999 ====================================================== 2> (a,0) 2> (o_side-a,0) 2> (a,1) 2> (a,9) 2> (a,8) 2> (o_side-a,8) ======================= ^^^ window_num: 2 ^^^ ================== ======================= range: [0___10000 ) ======================= key: a ======================= watermark: 10999 ====================================================== 2> (a,0) 2> (o_side-a,0) 2> (a,1) 2> (a,9) 2> (a,8) 2> (o_side-a,8) 2> (a,1) ======================= ^^^ window_num: 3 ^^^ ================== ======================= range: [0___10000 ) ======================= key: a ======================= watermark: 10999 ====================================================== 2> (a,0) 2> (o_side-a,0) 2> (a,1) 2> (a,9) 2> (a,8) 2> (o_side-a,8) 2> (a,1) 2> (a,1) ======================= ^^^ window_num: 4 ^^^ ================== ======================= range: [0___10000 ) ======================= key: a ======================= watermark: 10999 ====================================================== 2> (a,0) 2> (o_side-a,0) 2> (a,1) 2> (a,9) 2> (a,8) 2> (o_side-a,8) 2> (a,1) 2> (a,1) 2> (a,1) ======================= ^^^ window_num: 5 ^^^ ================== ======================= range: [0___10000 ) ======================= key: a ======================= watermark: 10999 ====================================================== 2> (latedata: a,1) -- 输入17后该条数据失效



