// TODO 1. 指定数据流以EventTime模式
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设定周期性插入WaterMark时间 EventTime语义下默认为200毫秒,这里可以自定义
env.getConfig.setAutoWatermarkInterval(50)
// 输入数据示例: 1,1547718205,zhangsan
val dataDstream: DataStream[User] = inputDStream
.map(
data => {
val dataArray: Array[String] = data.split(",")
User(dataArray(0), dataArray(1).toLong, dataArray(2))
}
)
// TODO 2. 设定 WatereMark 较小延迟时间 Time.milliseconds(30),接收大部分乱序迟到数据
// assignTimestampsAndWatermarks 处理乱序,相反 assignAscendingTimestamps 为处理不乱序
// BoundedOutOfOrdernessTimestampExtractor 为周期性生成 watermark AssignerWithPeriodicWatermarks的实现
// 对应的 AssignerWithPunctuatedWatermarks 为每条数据后面插入watermark
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[User](Time.milliseconds(30)) {
override def extractTimestamp(element: User): Long = element.timestamp * 1000
} )
.keyBy("id")
// 窗口起始位置定义 return timestamp - (timestamp - offset + windowSize) % windowSize
.timeWindow(Time.seconds(15))
// TODO 3. 设定较长迟到数据等待
// 真正的最后等待的迟到数据时间要在这个基础上还要加上设定的WaterMark延迟时间(1分钟 + 30毫秒) 超过这个时间会被放入侧输出流
.allowedLateness(Time.minutes(1))
// TODO 4. 超过等待时间的数据放入侧输出流
.sideOutputLateData(new OutputTag[User]("late"))
.reduce( (u1, u2) => User(u1.id, u1.timestamp.min(u2.timestamp), u1.name) )
// TODO 5. 获取测输出流数据
// .getSideOutput(new OutputTag[User]("late"))