栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink001---offset设置窗口起点

Flink001---offset设置窗口起点

Intro

滚动窗口,想要设置窗口开始的时点,怎么弄。举例说明:

  • watermart设置为3s
  • 滚动窗口长度设置为5s
  • 起点设置为3,即[3,8)是一个窗口
Code

代码没啥说的,就是个offset的使用

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, SlidingProcessingTimeWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import com.flink.sourceread.SensorReading
import com.util.TimeUtils.convertTimeStamp2DateStr

import scala.collection.mutable.ArrayBuffer
object waterMarkTest {
  def main(args: Array[String]): Unit = {
    val dataList = List(
      SensorReading("sensor_1", 1609473600, 35.8)
    )
    // 创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    // 设置时间语义-事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    // 水位线,每隔50毫秒产生一个watermark
    // 相当于50ms检验一次,完成数据分配
    env.getConfig.setAutoWatermarkInterval(50)

    // windows 通过nc -lp 7777 命令写入
    val inputStream = env.socketTextStream("localhost", 7777)

    // 先转换成样例类类型(简单转换操作)
    val dataStream = inputStream
      .map(data => {
        val arr = data.trim.split(",")
        SensorReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L //秒转成毫秒
      }) //waterMark延迟时间3秒
    // 窗口起始点
    val latetag = new OutputTag[(String, Double, Long, String)]("late")
    // 每5秒统计一次,窗口内各传感器所有温度的最小值,以及最新的时间戳
    val resultStream = dataStream
      .map(
        data => (data.id, data.timestamp, ArrayBuffer(data.timestamp), convertTimeStamp2DateStr(data.timestamp))
      )
      .keyBy(_._1) // 按照二元组的第一个元素(id)分组
      // 滚动时间窗口,5s滑窗,1s偏移,即统计[3,8)的数据
      .window(TumblingEventTimeWindows.of(Time.seconds(5), Time.seconds(3)))
      .reduce((curRes, newData) => (curRes._1, newData._2, curRes._3++newData._3, convertTimeStamp2DateStr(newData._2)))
    resultStream.print("result")
    env.execute("window test")
  }

}
测试情况

input:

sensor_1", 1609473600, 35.8
sensor_1", 1609473601, 35.8
sensor_1", 1609473602, 35.8
sensor_1", 1609473603, 35.8
sensor_1", 1609473604, 35.8
sensor_1", 1609473605, 35.8
sensor_1", 1609473606, 35.8
sensor_1", 1609473607, 35.8
sensor_1", 1609473608, 35.8
sensor_1", 1609473609, 35.8
sensor_1", 1609473610, 35.8
sensor_1", 1609473620, 35.8

output:

result> (sensor_1",1609473602,ArrayBuffer(1609473600, 1609473601, 1609473602),2021-01-01 12:00:02)
result> (sensor_1",1609473607,ArrayBuffer(1609473603, 1609473604, 1609473605, 1609473606, 1609473607),2021-01-01 12:00:07)
result> (sensor_1",1609473610,ArrayBuffer(1609473608, 1609473609, 1609473610),2021-01-01 12:00:10)

测试逻辑:

  • 输入0-10s的数据
  • 可以看到[0,1,2],[3,4,5,6,7],[8,9,10]分成三块

                                2021-12-03 于南京市江宁区九龙湖

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/632990.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号