目录
window的类型
window API使用方法
EventTime
乱序数据的处理
水位线(Watermark)
watermark的传递
watermark的使用
Windows 是处理无限流的核心。Windows 将流分成有限大小的“桶”,我们可以在这些桶上应用计算。也就是类似spark streaming的默认模式,每隔一段时间,计算前一段时间窗口内的数据。
window的类型
时间窗口(TimeWindow):滚动时间窗口、滑动时间窗口、会话窗口
计数窗口(CountWindow):滚动计数窗口、滑动计数窗口
滚动窗口
根据固定时间长度对数据进行切分,没有重叠的数据
滑动窗口:
滑动窗口由固定窗口长度和滑动间隔组成,数据有重叠,范围是左开右闭
会话窗口
指定一个timeout的时间间隔,如果在这一段时间没有接受新数据,就会形成新的窗口
window API使用方法
使用.window() 来定义一个窗口,这个window()方法必须在keyBy之后才可以使用
flink可以通过.timeWindow和.countWindow方法,用于定义时间窗口和计数窗口
timeWindow和countWindow的使用
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{ProcessingTimeSessionWindows, SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
object timeWindowTest1 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val lineDS: DataStream[String] = env.socketTextStream("doker",8888)
val wordDS: DataStream[String] = lineDS.flatMap(_.split(","))
val kvDS = wordDS.map((_, 1))
val keyByDS: KeyedStream[(String, Int), String] = kvDS.keyBy(_._1)
val TumblingwindowDS: WindowedStream[(String, Int), String, TimeWindow] =
keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
val SlidingwindowDS: WindowedStream[(String, Int), String, TimeWindow] =
keyByDS.window(SlidingProcessingTimeWindows.of(Time.seconds(15),Time.seconds(5)))
val TumblingwindowDS_short = keyByDS.timeWindow(Time.seconds(15))
val SlidingwindowDS_short = keyByDS.timeWindow(Time.seconds(15),Time.seconds(5))
val SessionDS = keyByDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
val TumblingcountWindowDS = keyByDS.countWindow(10)
val SlidingcountWindowDS=keyByDS.countWindow(10,5)
val countDS1: DataStream[(String, Int)] = TumblingwindowDS.reduce((x,y)=>(x._1,y._2+x._2))
val countDS2: DataStream[(String, Int)] = SlidingwindowDS.reduce((x,y)=>(x._1,y._2+x._2))
val countDS3: DataStream[(String, Int)] = TumblingcountWindowDS.reduce((x,y)=>(x._1,y._2+x._2))
// countDS1.print()
// countDS2.print()
countDS3.print()
env.execute()
}
}
窗口分配器(window assigner)有如下四种:
在使用窗口之后,还需要使用窗口函数:
可以分为两类:
1、增量聚合函数(incremetal aggregation functions)
每条数据到来就进行计算,保持一个简单的状态
比如 ReduceFunction,AggreagteFunction
2、全窗口函数(full window functions)
把窗口所有的数据收集起来,等到计算的时候遍历所有数据
比如 ProcessfunWindowFunction
其它可选window API
1、trigger() —— 触发器,定义 window 什么时候关闭,触发计算并输出结果
2、evictor() —— 移除器,定义移除某些数据的逻辑
3、allowedLateness() —— 允许处理迟到的数据
4、sideOutputLateData() —— 将迟到的数据放入侧输出流
5、getSideOutput() —— 获取侧输出流
具体的使用选择
EventTime
事件时间(event time): 事件产生的时间
摄取时间(ingestion time): 数据进入Flink的时间,也就是source读取数据的时间
处理时间(processing time):操作算子处理数据的时间
举个例子,比如我现在要监控实时水位高度,那么现在应改选取什么时间呢?毫无疑问应该选取事件时间,现在我希望每5s就返回水位的最高值,(如果是processing time,他舍弃了数据本身的时间,进入到程序中的数据一定是有延迟的),但是由于分布式机制和网络延迟导致数据可能不是按照发生的顺序发送来,可能一部分数据没有被处理到,针对这种现象,提出了WaterMarks处理乱序数据。
乱序数据的处理
此张图中展示了乱序对数据处理带来的影响,理想情况下的数据是按照顺序发送来的,但是现实情况下,可能是第二种情况;
首先是理想情况:设置窗口大小为5,接收到第六条数据的时候,开始计算前一个窗口内的数据
再来是实际情况:我们接收前5s的数据,到5s后窗口关闭了,这时候非常尴尬,2和3s的数据没有处理到,那么放进下一窗口可以吗?明显不可以,下一个窗口是6-10s的数据,他也不要这两条数据
遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口
水位线(Watermark)
Watermark是一种衡量Event Time进展的机制,可以设定延迟触发Watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用Watermark机制结合window来实现;
数据流中的 Watermark 用于表示timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。watermark 用来让程序自己平衡延迟和结果正确性
默认的watermark等于数据中的事件戳(只增不减)
看一下BoundedOutOfOrdernessTimestampExtractor方法中,指定了Watermark的计算方式
watermark等于目前接收到的数据中的最大时间戳减去最大乱序时间
所以上面的乱序数据有了解决方法,看下图,设置了延迟时间后,在6s后,watermark才等于5,这时5s前的数据全部接收到了,窗口可以关闭
watermark的传递
首先watermark会向下游的task发送,可以看到图中上游的不同分区都向这个下游的task中发送watermark,但是每一个task的watermark也是挑选出分区中最小的那个作为基准
watermark的使用
还是一个wordcount的案例,左边是单词,右边是时间戳
java,1646583690000
java,1646583691000
java,1646583692000
java,1646583695000
java,1646583696000
java,1646583697000
java,1646583694000
java,1646583698000
AssignerWithPeriodicWatermarks 这样会来一个数据就生成一个watermark
AssignerWithPunctuatedWatermarks 这种方式不是固定时间的,根据需要对每条数据进行筛选和处理
(下图不太准确,具体情况比较复杂)
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object WindowEventTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定时间模式事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//在有多个分区的时候,必须要保持每个分区的水位线是相同的,所以为了方便这里就设置分区为1
env.setParallelism(1)
val linesDS = env.socketTextStream("doker", 8888)
val wordDS = linesDS.map(line => {
val splits = line.split(",")
val value = splits(0)
//时间戳单位需要为ms
val time = splits(1).toLong
(value, time)
})
val waterDS = wordDS.assignTimestampsAndWatermarks(
//指定时间戳的乱序时间
new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(3)) {
//指定数据里面时间戳位置
override def extractTimestamp(element: (String, Long)): Long = element._2
}
)
val countDS = waterDS
.map(kv=>(kv._1,1))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
countDS.print()
env.execute()
}
}
结果时4没有错
java,1646583690000
java,1646583691000
java,1646583692000
java,1646583695000
java,1646583696000
java,1646583697000
java,1646583694000
java,1646583698000 (此时的watermark=1646583698000-3000,所以java,1646583695000之前的数据都得到了计算)窗口的计算,就是从第一条数据的时间开始,结束时间为
java,1646583690000~java,1646583695000 左闭右开,总共有4个单词
怎么确定窗口的起始时间?
比如现在timestamp=6s,窗口大小为5s
6-(6+5)%5=5,窗口起始时间为5s
对于乱序数据,其实也可以用旁路输出



