Event time:数据真正产生的时间
access:time
应该是属于数据/时间的一个部分
watermark: 肯定和EventTIme相关 允许数据迟到多久
优点:
执行结果确定的
乱序、延时
缺点:
延迟
Ingestion time: 进入flink的时间
与机器时间有关系
Flink source operator 有关
processiong time:业务执行的时间
operator运行时间
window: 窗口
无限
分类: 时间窗口和数量窗口
TUMBLING WINDOW 滚动
时间对齐,窗口长度固定,不重叠
BI统计
SLIDING WINDOW 滑动
SESSION WINDOW 会话窗口
GLOBAL WINDOW
3. 滚动window数量窗口
val stream = env.socketTextStream("hadoop001", 9527)
stream.map(_.trim.toInt)
.countWindowAll(5)
.sum(0).print()
keyby后的数量窗口
stream.map(x => {
(x.trim.toInt , 1 )
}).keyBy(x => x._1)
.countWindowAll(5) // keyby 分组后的五条,也就是说一个partition内积累5条才展示
.sum(1).print()
滚动窗口简单写法 默认是processing time
stream.map(_.trim.toInt)
.timeWindowAll(Time.seconds(5))
.sum(0).print()
keyby后的数量窗口
stream.map(x => {
(x.trim.toInt , 1 )
}).keyBy(x => x._1)
.timeWindowAll(Time.seconds(5)) // TumblingProcessingTimeWindow
.sum(1).print()
3. 滑动window
10秒一个窗口,每隔5秒滚一次
stream.map(_.trim.toInt)
.timeWindowAll(Time.seconds(10),Time.seconds(5))
.sum(0).print()
开始有半个窗口
0 - 5
0 -10
5 -15
10 -20
带key
stream.map(x => {
(x.trim.toInt , 1 )
}).keyBy(x => x._1)
.timeWindowAll(Time.seconds(10),Time.seconds(5))
.sum(1).print()
5. 窗口function
5.1 增量
增量聚合 来一条处理一条
求和
stream.map(x => (1, x.trim.toInt))
.keyBy(x=> x._1)
.timeWindow(Time.seconds(5))
.reduce((x,y) => {
(x._1,(x._2 + y._2))
}).print()
求平均
stream
.map(x => ("a", x.trim.toLong))
.keyBy(x=> x._1)
.timeWindow(Time.seconds(5))
.aggregate(new myAverageAggFunction)
.print()
class myAverageAggFunction extends AggregateFunction[(String,Long),(Long,Long) , Double] {
// 初始化累加器, 赋初始值
override def createAccumulator(): (Long, Long) = (0L, 0L)
override def add(value: (String, Long), accumulator: (Long, Long)): (Long, Long) = {
println("add ....... invoke ....." + value._1 + " " + value._2)
// 累加器里 第一个存放 和, 第二个存放 次数
(accumulator._1 + value._2, accumulator._2 + 1L)
}
override def getResult(accumulator: (Long, Long)): Double = {
// 总数 / 次数
accumulator._1 / accumulator._2.toDouble
}
// 累加器的合并操作
override def merge(a: (Long, Long), b: (Long, Long)): (Long, Long) = {
(a._1 + b._1, a._2 + b._2)
}
}
5.2 全量
全量聚合 批次处理



