设置eventTime的方式分两种情况:
升序数据提取时间戳
直接使用:.assignAscendingTimestamps(_.timestamp * 1000L)乱序数据提取时间戳,有三种种构造方式(1.10版本只有前两种,flink版本1.11以后建议使用方式三)
方式一:AssignerWithPeriodicWatermarks
周期性的生成 watermark,默认周期是200ms,也可以通过setAutoWatermarkInterval设置周期时间常用的实现类是:BoundedOutOfOrdernessTimestampExtractor(延时时间)【1.10及以前的版本,建议使用这种方式】方式二:AssignerWithPunctuatedWatermarks:
阶段性的生成 watermark,即每来一条数据就生成一个wm方式三:WatermarkStrategy (Flink 1.11.0及版本以上)
Flink 1.11.0版本以上版本,建议使用这种方式生成watermark固定乱序长度策略(forBoundedOutOfOrderness)
通过调用WatermarkStrategy对象上的forBoundedOutOfOrderness方法来实现,接收一个Duration类型的参数作为最大乱序(out of order)长度。WatermarkStrategy对象上的withTimestampAssigner方法为从事件数据中提取时间戳提供了接口一般使用这种策略单调递增策略(forMonotonousTimestamps)
通过调用WatermarkStrategy对象上的forMonotonousTimestamps方法来实现,无需任何参数,相当于将forBoundedOutOfOrderness策略的最大乱序长度outOfOrdernessMillis设置为0。不生成策略(noWatermarks)
WatermarkStrategy.noWatermarks()
示例:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 设置生成watermark的时间间隔,系统默认为200毫秒,一般使用系统默认即可
env.getConfig.setAutoWatermarkInterval(5000)
val sensorStream: DataStream[SensorReading] = env
.readTextFile("source.txt")
.map(new MyMapToSensorReading)
// 1、给一个没有乱序,时间为升序的流设置一个EventTime
val ascendingStream = sensorStream
.assignAscendingTimestamps(_.timestamp)
// 2、当流中存在时间乱序问题,引入watermark,并设置延迟时间
// 2.1 使用 AssignerWithPeriodicWatermarks 的常用实现类 BoundedOutOfOrdernessTimestampExtractor 设置EventTime
val watermarkStream: DataStream[SensorReading] = sensorStream
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
override def extractTimestamp(element: SensorReading): Long = {
element.timestamp * 1000
}
})
// 2.2 使用 AssignerWithPunctuatedWatermarks 设置EventTime 不常用,演示略
// 2.3 使用 WatermarkStrategy 设置EventTime (Flink 1.12.0版本以上)
val jsonObjDS = kafkaDS.map(line => GsonUtils.parserObject(line))
// 提取 eventtime
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner[JsonObject] {
override def extractTimestamp(jsonObj: JsonObject, l: Long): Long = {
jsonObj.get("ts").getAsLong
}
}))



