栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink的时间语义和Watermark

Flink的时间语义和Watermark

时间语义:

事件处理会经过几个特殊时间:

Event Time:事件创建的时间 Ingestion Time:数据进入Flink的时间 Processing Time:执行操作算子的本地系统时间,与机器相关 设置时间语义: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置事件时间语义 水位线(Watermark):         乱序数据的影响

当 Flink 以 Event Time 模式处理数据流时,它会根据数据里的时间戳来处理基于时间的算子 由于网络、分布式等原因,会导致乱序数据的产生,乱序数据会让窗口计算不准确

定义 怎样避免乱序数据带来计算不正确? ➢ 遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口 • Watermark 是一种衡量 Event Time 进展的机制,可以设定延迟触发 • Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 window 来实现; • 数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此window 的执行也是由 Watermark 触发的。 • watermark 用来让程序自己平衡延迟和结果正确性

watermark 的特点

1:watermark 是一条特殊的数据记录 2:watermark 必须单调递增,以确保任务的事件时间时钟在向前推进,而不是在后退 3:watermark 与数据的时间戳相关

watermark 的传递

每个分区都有自己的watermark,当上游不同分区合并进入下游同一个分区时, 下游会选取上游最小watermark进行广播。

watermark设定 在 Flink 中,watermark 由应用程序开发人员生成,这通常需要对相应的领域有一定的了解 • 如果watermark设置的延迟太久,收到结果的速度可能就会很慢,解决办法是在水位线到达之前输出一个近似结果 • 而如果watermark到达得太早,则可能收到错误结果,不过 Flink 处理迟到数据的机制可以解决这个问题 案例
package com.atguigu.window;

import com.atguigu.bean.SensorReading;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import javax.annotation.Nullable;

public class EventTimeWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置事件时间语义
        env.getConfig().setAutoWatermarkInterval(200);  //周期性的watermark是200毫秒

        DataStreamSource socketTextStream = env.socketTextStream("hadoop112", 7777);

        DataStream inputStream = socketTextStream.map(value -> {
            String[] split = value.split(",");
            return new SensorReading(split[0], new Long(split[1]), new Double(split[2]));
        })
//               .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
//                   @Override
//                   public long extractAscendingTimestamp(SensorReading element) {
//                       return element.getTimestamp() * 1000L;
//                   }
//               })    //升序,不延迟
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(1)) {
            @Override
            public long extractTimestamp(SensorReading element) {
                return element.getTimestamp() * 1000L;
            }
        });   //乱序,延迟


        
        OutputTag outputTag = new OutputTag("late");
        SingleOutputStreamOperator minBy = inputStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(outputTag)
                .minBy("temperature");
        minBy.print("minby");
        minBy.getSideOutput(outputTag).print("late");


        env.execute();
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/582750.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号