(1)事件时间(event time):数据里面自带的时间
(2)摄入时间(Ingestion TIme
):以source的系统时间为准
(3)处理时间(Processing TIm):处理数据的时间(北京时间…)
水位线:默认等于最大的时间戳
—将60s分成0-5,5-10,达到水位线就执行
—使用处理时间就是直接写代码timeWindow(Time.seconds(5))设置就可以了
—使用事件时间需要手动设置 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
—需要指定哪个字段为时间.assignTimestampsAndWatermarks
—多个分区数据同时被消费,可能数据乱序,导致数据的丢失,设置水位线
—将水位线前移,这样可以保证数据不会丢失
—并行度设置为1,由于每隔task是独立的,所以当某一个task数据达到水位线时 就会执行,但是其他的task中还没有达到,可能导致数据的丢失
—但是在大数据环境中,数据量比较大,水位线基本会一致
object Demo03Event {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//设置flink时间模式为事件时间
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "master:9092")
properties.setProperty("group.id", "test1")
//创建kafka的消费者
val flinkKafkaCusumor = new FlinkKafkaConsumer[String]("words", new SimpleStringSchema(), properties)
val kafkaDS: DataStream[String] = env.addSource(flinkKafkaCusumor)
//每隔5s统计id出现的次数
val eventDS = kafkaDS.map(line => {
val splits = line.split(",")
val id = splits(0)
val time = splits(1)
(id, time.toLong)
})
//告诉flink哪个字段为事件时间字段
//.assignAscendingTimestamps(_._2)
//指定水位线,水位线默认等于时间戳最大的时间
//数据乱序,将水位线前移5s,这样会等5s才会触发
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.seconds(5)) {
override def extractTimestamp(element: (String, Long)): Long = {
element._2
}
})
eventDS.map(kv => (kv._1, 1))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.sum(1)
.print()
env.execute()
}
}



