栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink延迟数据处理3件套

Flink延迟数据处理3件套

Flink延迟数据处理3件套

| watermark(水位线)| allowedLateness(最大迟到数据)| sideOutputLateData(侧输出流)

样例代码
package com.andy.flink.demo.datastream.sideoutputs

import com.andy.flink.demo.datastream.sideoutputs.FlinkHandleLateDataTest2.SensorReading
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


object FlinkHandleLateDataTest2 {

  //定义类定义实体Model
  case class SensorReading(id: String,
                           timestamp: Long,
                           temperature: Double)

  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism( 1 )
    // 从调用时刻开始给env创建的每一个stream追加时间特征
    env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime )
    // 设置watermark的默认生成周期(单位:毫秒) -> 100毫秒生成一个WaterMark. 全局设置, 算子中如果设置将覆盖该全局设置
    env.getConfig.setAutoWatermarkInterval( 100L )

    val inputDStream: DataStream[String] = env.socketTextStream( "localhost", 9999 )

    val dataDstream: DataStream[SensorReading] = inputDStream
      .map( data => {
        val dataArray: Array[String] = data.split( "," )
        SensorReading( dataArray( 0 ), dataArray( 1 ).toLong, dataArray( 2 ).toDouble )
      } )
      // .assignAscendingTimestamps( _.timestamp * 1000L ) // 最理想状态:数据无延迟,按时间正序到达(这种理想情况下,直接指定时间戳字段就可以了)
      .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]
      // 给WaterMark的一个初始值延时时间,一般该值应能够覆盖住70%~80%左右的延迟数据
      ( Time.milliseconds( 1000 ) ) {
        // 指定时间戳字段以秒为单位 * 1000(这里需要使用 ms 单位,数据中的时间请自行转换为毫秒)
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      } )

    val lateOutputTag = new OutputTag[SensorReading]( "late" )

    // 迟到数据处理的三重保证机制: watermark(水位线) | allowedLateness(最大迟到数据)  | sideOutputLateData(侧输出流)
    val resultDStream: DataStream[SensorReading] = dataDstream
      .keyBy( "id" ) //按什么分组,形成键控流
      .timeWindow( Time.seconds( 5 ) ) //简便起见,这里使用滚动窗口
      .allowedLateness( Time.minutes( 1 ) ) //允许的数据最大延迟时间,则触发窗口关闭的时间为(窗口长度+Watermark时长+允许数据延迟的时间, 本例中为:5+1+60)
      .sideOutputLateData( lateOutputTag )
      .reduce( new MyReduceFunc() )

    dataDstream.print( "main-flow" )
    resultDStream.print( "result-flow" )
    // 获取侧输出流late并打印输出
    resultDStream.getSideOutput( lateOutputTag ).print( "late-flow" )

    env.execute( "FlinkHandleLateDataTest2" )
  }
}


class MyReduceFunc extends ReduceFunction[SensorReading] {
  override def reduce(value1: SensorReading,
                      value2: SensorReading): SensorReading = {
    SensorReading(
      value1.id,
      value2.timestamp,
      value1.temperature.min( value2.temperature )
    )
  }
}
三件套之一 : 水平线watermark

窗口关闭为5秒,watermark延时为1秒,所以其实窗口数据只要[0,5), 5取不到(其实具体的时间戳的窗口从哪开始到那结束会有一个方法,并不是第一条数据时间戳是0就往后延长5秒,下面会看到),但是因为watermark延长了1s 输出到了时间戳6秒的时候才会进行输出,这时候窗口并没有关闭,因为我们设置了迟到数据allowedLateness

三件套之二 : 迟到数据allowedLateness

迟到数据设置成了一分钟,这一分钟之类所有的在[0,5)时间戳里的数据都会进行输出,来一条数据,直接输出一条数据。一分钟后窗口才是正式关闭.

三件套之三 : 侧输出流sideOutputLateData

兜底保证,窗口在关闭后,把数据输出到侧输出流,之后看到late的数据可以进行和之前的数据进行手动合并

实际窗口时间戳程序底层算法
    先从时间窗口的使用位置看起

自定义延迟数据处理类:

val lateOutputTag = new OutputTag[SensorReading]( "late" )

    // 迟到数据处理的三重保证机制: watermark(水位线) | allowedLateness(最大迟到数据)  | sideOutputLateData(侧输出流)
    val resultDStream: DataStream[SensorReading] = dataDstream
      .keyBy( "id" ) //按什么分组,形成键控流
      .timeWindow( Time.seconds( 5 ) ) //简便起见,这里使用滚动窗口
      .allowedLateness( Time.minutes( 1 ) ) //允许的数据最大延迟时间,则触发窗口关闭的时间为(窗口长度+Watermark时长+允许数据延迟的时间, 本例中为:5+1+60)
      .sideOutputLateData( lateOutputTag )
      .reduce( new MyReduceFunc() )

这里设置为滚动窗口, 窗口大小 5 秒钟.

    监控流scala处理类
    KeyedStream.scala
import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}

@Public
class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {

  // ------------------------------------------------------------------------
  //  Properties
  // ------------------------------------------------------------------------

  
  @Internal
  def getKeyType = javaStream.getKeyType()


  def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = {
    new WindowedStream(javaStream.timeWindow(size))
  }

从这句代码 及 import 语句中, 可以看到:
new WindowedStream(javaStream.timeWindow(size))
WindowsStream的传参中,使用了KeyedStream.java里的: javaStream里的方法,

    监控流java处理类
    KeyedStream.java
@Public
public class KeyedStream extends DataStream {

	
	private final KeySelector keySelector;

	
	private final TypeInformation keyType;

	// ------------------------------------------------------------------------
	//  Windowing
	// ------------------------------------------------------------------------

	
	public WindowedStream timeWindow(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			return window(TumblingProcessingTimeWindows.of(size));
		} else {
			return window(TumblingEventTimeWindows.of(size));
		}
	}
    滚动事件窗口(EventTimeWindow)java处理类
    TumblingEventTimeWindows.java
@PublicEvolving
public class TumblingEventTimeWindows extends WindowAssigner {
	private static final long serialVersionUID = 1L;

	private final long size;

	private final long offset;

	protected TumblingEventTimeWindows(long size, long offset) {
		if (Math.abs(offset) >= size) {
			throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
		}

		this.size = size;
		this.offset = offset;
	}

	@Override
	public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		if (timestamp > Long.MIN_VALUE) {
			// Long.MIN_VALUE is currently assigned when no timestamp is present
			long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
			return Collections.singletonList(new TimeWindow(start, start + size));
		} else {
			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
					"'DataStream.assignTimestampsAndWatermarks(...)'?");
		}
	}

从分配窗口(assignWindows)方法中, 可以看到窗口的起始位置的计算来自于:
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);

    时间窗口(TimeWindow)java处理类
    TimeWindow.java

进一步在 TimeWindow.java 类中, 可以看到如下方法:

@PublicEvolving
public class TimeWindow extends Window {

	private final long start;
	private final long end;

	public TimeWindow(long start, long end) {
		this.start = start;
		this.end = end;
	}
	
	
	public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
		return timestamp - (timestamp - offset + windowSize) % windowSize;
	}

从该算法中可以看出窗口开始位置的计算公式为:
timestamp - (timestamp - offset + windowSize) % windowSize
在 (timestamp - offset + windowSize) % windowSize 里, 先减去偏移量, 再加上窗口大小, 最后再与窗口大小取模, 该公式可以得出:

    offset与windowSize的运算是避免时间戳取到负值先加上一个窗口大小, 再使用窗口取模, 则可理解为对数据大小的影响范围的贡献可以忽略.(timestamp - offset + windowSize) % windowSize 最终可以理解为对timestamp按窗口取模, 而整体的(timestamp - offset + windowSize) % windowSize可理解为对timestamp去除余数, 获得一个去除余数后的整数,
    作为窗口的起始位置.
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774385.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号