栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink watermark(水位线)的实战理解

flink watermark(水位线)的实战理解

注意,窗口是窗口,水位线是水位线,窗口会按照程序设计自动划分出来,不会被水位线影响到,水位线能影响到的只是窗口里的数据计算的触发点,也就是延迟窗口的右界线。

废话不多说,上代码,我这里有详细注释哦!如果有问题,请指正。

package window

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object tumbling {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //为每一条数据追加一个时间特性
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //socket数据,需要安装netcat(mac自带,win请百度),之后用命令在终端窗口输入nc -lk 11111
    val stream = env.socketTextStream("localhost",11111)
    // 对 stream添加水印 并进行处理并按 key 聚合
    val streamKeyBy = stream.assignTimestampsAndWatermarks(
      //有界无序时间戳提取程序,设定水印的延迟时间,延时的是每个窗口右边的时间,
      // 比如等于5秒的数据过来了那就会等到5+延时间的数据过来后,该窗口才会被触发。
      new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)) {
        //获取eventTime 数据格式为 ”时间 单词“
        override def extractTimestamp(t: String): Long = {
          val eventTime = t.split(" ")(0).toLong
          println(eventTime)
          eventTime
        }}
    ).map(item =>(item.split(" ")(1),1)).keyBy(0)
    //引入滑动窗口
    val streamTumblingWindow = streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(3)))

    //执行聚合操作
    val streamReduce = streamTumblingWindow.reduce((item1,item2)=>(item1._1,item1._2 + item2._2))

    streamReduce.print
    env.execute("TumblingWindowJob")
  }
}
注意:结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)。
一、设置了wartermark为0的情况结果,也就是不会影响到窗口的延迟触发计算:
1634091720000
1634091721000
1634091722000
1634091723000----->触发计算的位置
6> (a,3)
1634091724000
1634091725000
1634091726000----->触发计算的位置,左开右闭,计算的是3,4,5
6> (a,3)

二、如果设置了wartermark  ,那么对应的窗口也会延迟启动,也就是触发时间也会向后移,如下:newBoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)
streamKeyBy.window(TumblingEventTimeWindows.of(Time.seconds(3))),
三秒一个的窗口,触发窗口计算的窗口时间也被延长了3秒
1634091720000
1634091721000
1634091722000
1634091723000------>本来应该计算的位置
1634091724000
1634091721000------>混入乱序的数据
1634091722000------>混入乱序的数据
1634091725000
1634091726000------>触发计算的位置,计算的是0~3的这个窗口,而6对应的窗口,本该计算3,4,5了,但由于watermark的设置,需要等到9的数据过来才能触发。
6> (a,5)------>统计值5为1,2,3+乱序的数据。

重新启动程序,输入以下数据,9的数据过来后才能触发计算
1634091720000 a
1634091721000 a
1634091722000 a
1634091723000 a
1634091724000 a
1634091721000 a
1634091722000 a
1634091725000 a
1634091726000 a
1634091727000 a
1634091728000 a
1634091729000 a
6> (a,5)
6> (a,3)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327214.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号