事件处理会经过几个特殊时间:
Event Time:事件创建的时间 Ingestion Time:数据进入Flink的时间 Processing Time:执行操作算子的本地系统时间,与机器相关 设置时间语义: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置事件时间语义 水位线(Watermark): 乱序数据的影响 当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子 由于网络、分布式等原因,会导致乱序数据的产生,乱序数据会让窗口计算不准确 定义 怎样避免乱序数据带来计算不正确? ➢ 遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口 • Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发 • Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现; • 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此window 的执行也是由 Watermark 触发的。 • watermark 用来让程序自己平衡延迟和结果正确性 watermark 的特点 1:watermark 是一条特殊的数据记录 2:watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退 3:watermark 与数据的时间戳相关 watermark 的传递
每个分区都有自己的watermark,当上游不同分区合并进入下游同一个分区时, 下游会选取上游最小watermark进行广播。
watermark设定 在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解 • 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果 • 而如果watermark到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题 案例package com.atguigu.window;
import com.atguigu.bean.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import javax.annotation.Nullable;
public class EventTimeWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置事件时间语义
env.getConfig().setAutoWatermarkInterval(200); //周期性的watermark是200毫秒
DataStreamSource socketTextStream = env.socketTextStream("hadoop112", 7777);
DataStream inputStream = socketTextStream.map(value -> {
String[] split = value.split(",");
return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
})
// .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
// @Override
// public long extractAscendingTimestamp(SensorReading element) {
// return element.getTimestamp() * 1000L;
// }
// }) //升序,不延迟
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(1)) {
@Override
public long extractTimestamp(SensorReading element) {
return element.getTimestamp() * 1000L;
}
}); //乱序,延迟
OutputTag outputTag = new OutputTag("late");
SingleOutputStreamOperator minBy = inputStream.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(outputTag)
.minBy("temperature");
minBy.print("minby");
minBy.getSideOutput(outputTag).print("late");
env.execute();
}
}



