栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink时间窗口

Flink时间窗口

Flink时间窗口 1. Time

Event time:数据真正产生的时间
access:time
应该是属于数据/时间的一个部分
watermark: 肯定和EventTIme相关 允许数据迟到多久
优点:
执行结果确定的
乱序、延时
缺点:
延迟

Ingestion time: 进入flink的时间
与机器时间有关系
Flink source operator 有关

processiong time:业务执行的时间
operator运行时间

2. window

window: 窗口
无限
分类: 时间窗口和数量窗口

TUMBLING WINDOW 滚动
时间对齐,窗口长度固定,不重叠
BI统计
SLIDING WINDOW 滑动

SESSION WINDOW 会话窗口

GLOBAL WINDOW

3. 滚动window

数量窗口

    val stream = env.socketTextStream("hadoop001", 9527)
    stream.map(_.trim.toInt)
      .countWindowAll(5)
      .sum(0).print()

keyby后的数量窗口

    stream.map(x => {
      (x.trim.toInt , 1 )
    }).keyBy(x => x._1)
      .countWindowAll(5) // keyby 分组后的五条,也就是说一个partition内积累5条才展示
      .sum(1).print()

滚动窗口简单写法 默认是processing time

    stream.map(_.trim.toInt)
      .timeWindowAll(Time.seconds(5))
      .sum(0).print()

keyby后的数量窗口

    stream.map(x => {
      (x.trim.toInt , 1 )
    }).keyBy(x => x._1)
      .timeWindowAll(Time.seconds(5)) // TumblingProcessingTimeWindow
      .sum(1).print()
3. 滑动window

10秒一个窗口,每隔5秒滚一次

    stream.map(_.trim.toInt)
      .timeWindowAll(Time.seconds(10),Time.seconds(5))
      .sum(0).print()

开始有半个窗口
0 - 5
0 -10
5 -15
10 -20

带key

        stream.map(x => {
          (x.trim.toInt , 1 )
        }).keyBy(x => x._1)
          .timeWindowAll(Time.seconds(10),Time.seconds(5))
          .sum(1).print()
5. 窗口function 5.1 增量

增量聚合 来一条处理一条
求和

    stream.map(x => (1, x.trim.toInt))
      .keyBy(x=> x._1)
      .timeWindow(Time.seconds(5))
      .reduce((x,y) => {
        (x._1,(x._2 + y._2))
      }).print()


求平均

 
        stream
          .map(x => ("a", x.trim.toLong))
          .keyBy(x=> x._1)
          .timeWindow(Time.seconds(5))
          .aggregate(new myAverageAggFunction)
          .print()



class myAverageAggFunction extends AggregateFunction[(String,Long),(Long,Long) , Double] {
  // 初始化累加器, 赋初始值
  override def createAccumulator(): (Long, Long) = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)): (Long, Long) = {
    println("add ....... invoke ....." + value._1 + "      " + value._2)
    // 累加器里 第一个存放 和,  第二个存放 次数
    (accumulator._1 + value._2, accumulator._2 + 1L)
  }

  override def getResult(accumulator: (Long, Long)): Double = {
    // 总数 / 次数
    accumulator._1 / accumulator._2.toDouble
  }
  // 累加器的合并操作
  override def merge(a: (Long, Long), b: (Long, Long)): (Long, Long) = {
    (a._1 + b._1, a._2 + b._2)
  }
}
5.2 全量

全量聚合 批次处理

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/280911.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号