val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val sourceStream = env.fromCollection(List(
"a,1646150400000",
"a,1646150403000",
"a,1646150406000",
"a,1646150410000",
"a,1646150415000",
"b,1646150400000",
"b,1646150403000",
"b,1646150410000",
"b,1646150415000",
"c,1646150400000",
"c,1646150403000",
"c,1646150415000"
)).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(3)){
override def extractTimestamp(t: String): Long = t.split(",")(1).toLong
})
val aggdata: DataStream[(Long, String, Int)] = sourceStream.map(
line => {
val splits = line.split(",")
(splits(0), 1)
}
).keyBy(_._1)
.timeWindow(Time.seconds(10), Time.seconds(5))
//IN, OUT, KEY, W
.reduce((v1:(String,Int), v2:(String,Int)) => {
(v1._1, v1._2 + v2._2)
}, new ProcessWindowFunction[(String, Int), (Long, String, Int), String, TimeWindow] {
override def process(key: String, ctx: Context, elements: Iterable[(String, Int)], out: Collector[(Long, String, Int)]): Unit = {
val windowTag = ctx.window.getEnd
for (e <- elements) {
out.collect((windowTag, e._1, e._2))
}
}
})
// aggData.print()
aggData
.keyBy(_._1)
.timeWindowAll(Time.seconds(5))
.apply(new AllWindowFunction[(Long,String,Int),String,TimeWindow]{
override def apply(window: TimeWindow, input: Iterable[(Long, String, Int)], out: Collector[String]): Unit = {
out.collect(input.seq.toList.sortBy(_._3).reverse.toList.toString())
}
})
.print()
env.execute()