样例代码| watermark(水位线)| allowedLateness(最大迟到数据)| sideOutputLateData(侧输出流)
package com.andy.flink.demo.datastream.sideoutputs
import com.andy.flink.demo.datastream.sideoutputs.FlinkHandleLateDataTest2.SensorReading
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object FlinkHandleLateDataTest2 {
//定义类定义实体Model
case class SensorReading(id: String,
timestamp: Long,
temperature: Double)
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism( 1 )
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime )
// 设置watermark的默认生成周期(单位:毫秒) -> 100毫秒生成一个WaterMark. 全局设置, 算子中如果设置将覆盖该全局设置
env.getConfig.setAutoWatermarkInterval( 100L )
val inputDStream: DataStream[String] = env.socketTextStream( "localhost", 9999 )
val dataDstream: DataStream[SensorReading] = inputDStream
.map( data => {
val dataArray: Array[String] = data.split( "," )
SensorReading( dataArray( 0 ), dataArray( 1 ).toLong, dataArray( 2 ).toDouble )
} )
// .assignAscendingTimestamps( _.timestamp * 1000L ) // 最理想状态:数据无延迟,按时间正序到达(这种理想情况下,直接指定时间戳字段就可以了)
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]
// 给WaterMark的一个初始值延时时间,一般该值应能够覆盖住70%~80%左右的延迟数据
( Time.milliseconds( 1000 ) ) {
// 指定时间戳字段以秒为单位 * 1000(这里需要使用 ms 单位,数据中的时间请自行转换为毫秒)
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
} )
val lateOutputTag = new OutputTag[SensorReading]( "late" )
// 迟到数据处理的三重保证机制: watermark(水位线) | allowedLateness(最大迟到数据) | sideOutputLateData(侧输出流)
val resultDStream: DataStream[SensorReading] = dataDstream
.keyBy( "id" ) //按什么分组,形成键控流
.timeWindow( Time.seconds( 5 ) ) //简便起见,这里使用滚动窗口
.allowedLateness( Time.minutes( 1 ) ) //允许的数据最大延迟时间,则触发窗口关闭的时间为(窗口长度+Watermark时长+允许数据延迟的时间, 本例中为:5+1+60)
.sideOutputLateData( lateOutputTag )
.reduce( new MyReduceFunc() )
dataDstream.print( "main-flow" )
resultDStream.print( "result-flow" )
// 获取侧输出流late并打印输出
resultDStream.getSideOutput( lateOutputTag ).print( "late-flow" )
env.execute( "FlinkHandleLateDataTest2" )
}
}
class MyReduceFunc extends ReduceFunction[SensorReading] {
override def reduce(value1: SensorReading,
value2: SensorReading): SensorReading = {
SensorReading(
value1.id,
value2.timestamp,
value1.temperature.min( value2.temperature )
)
}
}
三件套之一 : 水平线watermark
窗口关闭为5秒,watermark延时为1秒,所以其实窗口数据只要[0,5), 5取不到(其实具体的时间戳的窗口从哪开始到那结束会有一个方法,并不是第一条数据时间戳是0就往后延长5秒,下面会看到),但是因为watermark延长了1s 输出到了时间戳6秒的时候才会进行输出,这时候窗口并没有关闭,因为我们设置了迟到数据allowedLateness
迟到数据设置成了一分钟,这一分钟之类所有的在[0,5)时间戳里的数据都会进行输出,来一条数据,直接输出一条数据。一分钟后窗口才是正式关闭.
兜底保证,窗口在关闭后,把数据输出到侧输出流,之后看到late的数据可以进行和之前的数据进行手动合并
先从时间窗口的使用位置看起
自定义延迟数据处理类:
val lateOutputTag = new OutputTag[SensorReading]( "late" )
// 迟到数据处理的三重保证机制: watermark(水位线) | allowedLateness(最大迟到数据) | sideOutputLateData(侧输出流)
val resultDStream: DataStream[SensorReading] = dataDstream
.keyBy( "id" ) //按什么分组,形成键控流
.timeWindow( Time.seconds( 5 ) ) //简便起见,这里使用滚动窗口
.allowedLateness( Time.minutes( 1 ) ) //允许的数据最大延迟时间,则触发窗口关闭的时间为(窗口长度+Watermark时长+允许数据延迟的时间, 本例中为:5+1+60)
.sideOutputLateData( lateOutputTag )
.reduce( new MyReduceFunc() )
这里设置为滚动窗口, 窗口大小 5 秒钟.
监控流scala处理类
KeyedStream.scala
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
@Public
class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
@Internal
def getKeyType = javaStream.getKeyType()
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
new WindowedStream(javaStream.timeWindow(size))
}
从这句代码 及 import 语句中, 可以看到:
new WindowedStream(javaStream.timeWindow(size))
WindowsStream的传参中,使用了KeyedStream.java里的: javaStream里的方法,
监控流java处理类
KeyedStream.java
@Public public class KeyedStreamextends DataStream { private final KeySelector keySelector; private final TypeInformation keyType; // ------------------------------------------------------------------------ // Windowing // ------------------------------------------------------------------------ public WindowedStream timeWindow(Time size) { if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) { return window(TumblingProcessingTimeWindows.of(size)); } else { return window(TumblingEventTimeWindows.of(size)); } }
滚动事件窗口(EventTimeWindow)java处理类
TumblingEventTimeWindows.java
@PublicEvolving public class TumblingEventTimeWindows extends WindowAssigner
从分配窗口(assignWindows)方法中, 可以看到窗口的起始位置的计算来自于:
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
时间窗口(TimeWindow)java处理类
TimeWindow.java
进一步在 TimeWindow.java 类中, 可以看到如下方法:
@PublicEvolving
public class TimeWindow extends Window {
private final long start;
private final long end;
public TimeWindow(long start, long end) {
this.start = start;
this.end = end;
}
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
从该算法中可以看出窗口开始位置的计算公式为:
timestamp - (timestamp - offset + windowSize) % windowSize
在 (timestamp - offset + windowSize) % windowSize 里, 先减去偏移量, 再加上窗口大小, 最后再与窗口大小取模, 该公式可以得出:
offset与windowSize的运算是避免时间戳取到负值先加上一个窗口大小, 再使用窗口取模, 则可理解为对数据大小的影响范围的贡献可以忽略.(timestamp - offset + windowSize) % windowSize 最终可以理解为对timestamp按窗口取模, 而整体的(timestamp - offset + windowSize) % windowSize可理解为对timestamp去除余数, 获得一个去除余数后的整数,
作为窗口的起始位置.



