时间乱序数据情况实例验证
POM文件代码实现 测试情况
时间乱序数据情况由于业务数据采集是获取的数据有时并不能保证数据的顺序传输,错误的数据顺序可能会带来业务的异常。例如:数据如下;
01,1635867066000 01,1635867067000 01,1635867068000 01,1635867069000 01,1635867070000 01,1635867071000实例验证 POM文件
代码实现org.apache.flink flink-java 1.13.2 org.apache.flink flink-core 1.13.2 org.apache.flink flink-streaming-java_2.12 1.13.2 org.apache.flink flink-clients_2.12 1.13.2 com.alibaba fastjson 1.2.29
设置解释:
env.getConfig().setAutoWatermarkInterval(1000L):每隔一秒去自动emitWatermarkTumblingEventTimeWindows.of(Time.seconds(4)):滚动窗口为4sprivate long maxOutOfOrderness = 3000L:允许的最大延迟时间3s
注意:
//最初写成Long.MIN_VALUE 导致new Watermark(maxTimeStamp - maxOutOfOrderness) //超出范围出错,出错不报错很难排查 //private long maxTimeStamp =Long.MIN_VALUE; //排出后改为 private long maxTimeStamp = 0L;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Iterator;
public class IoTMain4 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
// 设置自动水印发射的间隔
env.getConfig().setAutoWatermarkInterval(1000L);
env.setParallelism(1);
DataStreamSource sourceDs = env.socketTextStream("localhost", 9000);
SingleOutputStreamOperator> mapDs = sourceDs
.map(new MapFunction>() {
private static final long serialVersionUID = -5181351998053732122L;
@Override
public Tuple2 map(String value) throws Exception {
String[] split = value.split(",");
return Tuple2.of(split[0], Long.valueOf(split[1]));
}
});
// 周期性 发射watermark
SingleOutputStreamOperator> watermarks = mapDs
.assignTimestampsAndWatermarks(new WatermarkStrategy>() {
private static final long serialVersionUID = -8873639694196414860L;
@Override
public WatermarkGenerator> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator>() {
private long maxTimeStamp = 0L;
private long maxOutOfOrderness = 3000L; // 允许的最大延迟时间
@Override
public void onEvent(Tuple2 event, long eventTimestamp,
WatermarkOutput output) {
// 每次来一条数据就会触发一次
maxTimeStamp = Math.max(maxTimeStamp, event.f1);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 周期性 发射watermark
output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
}
};
}
}.withTimestampAssigner(((element, recordTimestamp) -> element.f1)));
watermarks.keyBy(x -> x.f0).window(TumblingEventTimeWindows.of(Time.seconds(4)))
.apply(new WindowFunction, String, String, TimeWindow>() {
private static final long serialVersionUID = 65693184846116387L;
@Override
public void apply(String s, TimeWindow window, Iterable> input,
Collector out) throws Exception {
Iterator> iterator = input.iterator();
int count = 0;
while (iterator.hasNext()) {
count++;
iterator.next();
}
out.collect(window.getStart() + "->" + window.getEnd() + " " + s + ":" + count);
}
}).print();
env.execute();
}
}
测试情况
使用 netcat 向9000发送上述测试数据
C:Usersxxx> nc -l -p 9999 01,1635867066000 01,1635867067000 01,1635867068000 01,1635867069000 01,1635867070000 01,1635867071000
当最后一条数据 01,1635867071000 处理时,会触发窗口**[1635867064000, 1635867068000) **且不再接收此阶段数据(可以自定义处理);
滚动窗口将每一分钟每隔四秒分隔,前闭后开
例如2021-11-02 23:31分划分成:
[2021-11-02 23:31:00 , 2021-11-02 23:31:04) [2021-11-02 23:31:04 , 2021-11-02 23:31:08) .... [2021-11-02 23:31:56 , 2021-11-02 23:32:00)
[1635867064000, 1635867068000) 对应 [2021-11-02 23:31:04, 2021-11-02 23:31:08);
最大延迟时间为3s,所以触发 [1635867064000, 1635867068000) 此窗口的时间戳要 >= 1635867071000 即 1635867068000 + 3000 = 1635867067100



