栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【FLINK】浅谈Flink中对于乱序数据处理保证

【FLINK】浅谈Flink中对于乱序数据处理保证

 // TODO 1. 指定数据流以EventTime模式
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 设定周期性插入WaterMark时间 EventTime语义下默认为200毫秒,这里可以自定义
    env.getConfig.setAutoWatermarkInterval(50)
    // 输入数据示例: 1,1547718205,zhangsan
    val dataDstream: DataStream[User] = inputDStream
      .map(
        data => {
          val dataArray: Array[String] = data.split(",")
          User(dataArray(0), dataArray(1).toLong, dataArray(2))
        }
      )
      // TODO 2. 设定 WatereMark 较小延迟时间 Time.milliseconds(30),接收大部分乱序迟到数据
      // assignTimestampsAndWatermarks 处理乱序,相反 assignAscendingTimestamps 为处理不乱序
      // BoundedOutOfOrdernessTimestampExtractor 为周期性生成 watermark AssignerWithPeriodicWatermarks的实现
      //                                                        对应的 AssignerWithPunctuatedWatermarks 为每条数据后面插入watermark
      .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[User](Time.milliseconds(30)) {
        override def extractTimestamp(element: User): Long = element.timestamp * 1000
      } )
      .keyBy("id")
      // 窗口起始位置定义 return timestamp - (timestamp - offset + windowSize) % windowSize
      .timeWindow(Time.seconds(15))
      // TODO 3. 设定较长迟到数据等待
      // 真正的最后等待的迟到数据时间要在这个基础上还要加上设定的WaterMark延迟时间(1分钟 + 30毫秒) 超过这个时间会被放入侧输出流
      .allowedLateness(Time.minutes(1))
      // TODO 4. 超过等待时间的数据放入侧输出流
      .sideOutputLateData(new OutputTag[User]("late"))
      .reduce( (u1, u2) => User(u1.id, u1.timestamp.min(u2.timestamp), u1.name) )
      // TODO 5. 获取测输出流数据
      // .getSideOutput(new OutputTag[User]("late"))
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753945.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号