1、watermark周期性生成,默认是200ms,可以修改为500msexecutionEnvironment.getConfig.setAutoWatermarkInterval(500)
2、WindowAssigner包括TumblingProcessingTimeWindows SlidingProcessingTimeWindows
3、window的API
sum,max,min,reduce(传入reduceFucntion )、aggreation增量计算,来一条数据就进行计算,等到窗口结束时直接输出之前算好的数据结果,效率高
process(传入processWiindowFucntion),appply(传入windowFunction)全窗口计算,先把窗口的数据收集起来,等到窗口结束的时候会遍历所有的数据,效率低,有些场景增量聚合的数没太大用,比如排序、算中位数, 全窗口函数获得信息更多,比如从上下文获得窗口信息,状态信息,更底层更灵活
4、其他可选的API
trigger触发器:定义窗口什么时候关闭,什么时候触发窗口进行计算并输出结果
evictor移除器:定义移出某些数据的逻辑,类似于filter
allowedlateness:传入时间,允许处理迟到的数据的时间,这些迟到数据会进入正常的流计算中
sideoutputlateData:将迟到数据放到侧输出流,如果过了allowedlateness的时间,就会放到侧输出流
getsideoutput:获取侧输出流
package flinkSourse
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
object FlinkWindow {
def main(args: Array[String]): Unit = {
val executionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
executionEnvironment.setParallelism(1)
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //watermark周期性生成,默认是200ms
// executionEnvironment.getConfig.setAutoWatermarkInterval(500) //watermark周期性生成,默认是200ms,可以修改为500ms
val stream2: DataStream[String] = executionEnvironment.socketTextStream("127.0.0.1", 1111)
val transforStream: DataStream[SensorReading] = stream2.map(data => {
val tmpList: Array[String] = data.split(",")
SensorReading(tmpList(0), tmpList(1).toLong, tmpList(2).toDouble)
})
//增加watermark配置.punctated代表点状的,periodic代表周期性的(一般用这个)
val transforEventStream: DataStream[SensorReading] = transforStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
override def extractTimestamp(t: SensorReading) = t.timestamp * 1000
})
// WindowAssigner包括TumblingProcessingTimeWindows SlidingProcessingTimeWindows
// transforStream.keyBy(_.id).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
// transforStream.keyBy(_.id).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(2)))
// //简单写法
// transforStream.keyBy(_.id).timeWindow(Time.seconds(10))
// transforStream.keyBy(_.id).timeWindow(Time.seconds(10), Time.seconds(2))
//window的API
//sum,max,min,reduce(传入reduceFucntion )、aggreation增量计算,来一条数据就进行计算,等到窗口结束时直接输出之前算好的数据结果,效率高
//process(传入processWiindowFucntion),appply(传入windowFunction)全窗口计算,先把窗口的数据收集起来,等到窗口结束的时候会遍历所有的数据,效率低,有些场景增量聚合的数没太大用,比如排序、算中位数,
// 全窗口函数获得信息更多,比如从上下文获得窗口信息,状态信息,更底层更灵活
//其他可选的API
//trigger触发器:定义窗口什么时候关闭,什么时候触发窗口进行计算并输出结果
//evictor移除器:定义移出某些数据的逻辑,类似于filter
//allowedlateness:传入时间,允许处理迟到的数据的时间,这些迟到数据会进入正常的流计算中
//sideoutputlateData:将迟到数据放到侧输出流,如果过了allowedlateness的时间,就会放到侧输出流
//getsideoutput:获取侧输出流
//统计每15秒每个传感器温度的最小值,时间戳最大值
val lateTag = new OutputTag[SensorReading]("late")
val windowStream: DataStream[SensorReading] = transforEventStream
.keyBy("id")
.timeWindow(Time.seconds(15))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateTag)
.reduce((curdata, newdata) => {
SensorReading(curdata.id, newdata.timestamp, curdata.temperature.min(newdata.temperature))
})
//窗口start的创建公式 timestamp - (timestamp - offset + windowSize) % windowSize,第一条数据是1547718199,窗口为[195-210),[210-225),[225-240)
//1547718199000-(1547718199000+15000)%15000
val lateStream = windowStream.getSideOutput(lateTag)
windowStream.print("windowSteam")
lateStream.print("lateStream")
executionEnvironment.execute("flinkWindow ")
}
}



