注意,窗口是窗口,水位线是水位线,窗口会按照程序设计自动划分出来,不会被水位线影响到,水位线能影响到的只是窗口里的数据计算的触发点,也就是延迟窗口的右界线。
废话不多说,上代码,我这里有详细注释哦!如果有问题,请指正。
package window
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object tumbling {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//为每一条数据追加一个时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//socket数据,需要安装netcat(mac自带,win请百度),之后用命令在终端窗口输入nc -lk 11111
val stream = env.socketTextStream("localhost",11111)
// 对 stream添加水印 并进行处理并按 key 聚合
val streamKeyBy = stream.assignTimestampsAndWatermarks(
//有界无序时间戳提取程序,设定水印的延迟时间,延时的是每个窗口右边的时间,
// 比如等于5秒的数据过来了那就会等到5+延时间的数据过来后,该窗口才会被触发。
new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)) {
//获取eventTime 数据格式为 ”时间 单词“
override def extractTimestamp(t: String): Long = {
val eventTime = t.split(" ")(0).toLong
println(eventTime)
eventTime
}}
).map(item =>(item.split(" ")(1),1)).keyBy(0)
//引入滑动窗口
val streamTumblingWindow = streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//执行聚合操作
val streamReduce = streamTumblingWindow.reduce((item1,item2)=>(item1._1,item1._2 + item2._2))
streamReduce.print
env.execute("TumblingWindowJob")
}
}
注意:结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。
一、设置了wartermark为0的情况结果,也就是不会影响到窗口的延迟触发计算:
1634091720000
1634091721000
1634091722000
1634091723000----->触发计算的位置
6> (a,3)
1634091724000
1634091725000
1634091726000----->触发计算的位置,左开右闭,计算的是3,4,5
6> (a,3)
二、如果设置了wartermark ,那么对应的窗口也会延迟启动,也就是触发时间也会向后移,如下:newBoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)
streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(3))),
三秒一个的窗口,触发窗口计算的窗口时间也被延长了3秒
1634091720000
1634091721000
1634091722000
1634091723000------>本来应该计算的位置
1634091724000
1634091721000------>混入乱序的数据
1634091722000------>混入乱序的数据
1634091725000
1634091726000------>触发计算的位置,计算的是0~3的这个窗口,而6对应的窗口,本该计算3,4,5了,但由于watermark的设置,需要等到9的数据过来才能触发。
6> (a,5)------>统计值5为1,2,3+乱序的数据。
重新启动程序,输入以下数据,9的数据过来后才能触发计算
1634091720000 a
1634091721000 a
1634091722000 a
1634091723000 a
1634091724000 a
1634091721000 a
1634091722000 a
1634091725000 a
1634091726000 a
1634091727000 a
1634091728000 a
1634091729000 a
6> (a,5)
6> (a,3)



