Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。
Checkpoint
这是Flink最重要的一个特性。
Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。
Spark最近在实现Continue streaming,Continue streaming的目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定。
State
提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括ValueState、ListState、MapState,BroadcastState。
Time
除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。
Window
另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。
2、Flink-Window操作 2.1 为什么需要Window在流处理应用中,数据是连续不断的,有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。
在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。
2.2 Window的分类 2.2.1 按照time和count分类time-window:时间窗口:根据时间划分窗口,如:每xx分钟统计最近xx分钟的数据
count-window:数量窗口:根据数量划分窗口,如:每xx个数据统计最近xx个数据
2.2.2 按照slide和size分类窗口有两个重要的属性: 窗口大小size和滑动间隔slide,根据它们的大小关系可分为:
tumbling-window:滚动窗口:size=slide,如:每隔10s统计最近10s的数据
sliding-window:滑动窗口:size>slide,如:每隔5s统计最近10s的数据
注意:当size 按照上面窗口的分类方式进行组合,可以得出如下的窗口: 1.基于时间的滚动窗口tumbling-time-window--用的较多 2.基于时间的滑动窗口sliding-time-window--用的较多 3.基于数量的滚动窗口tumbling-count-window--用的较少 4.基于数量的滑动窗口sliding-count-window--用的较少 注意: Flink还支持一个特殊的窗口:Session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上个窗口的计算 window/windowAll 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中, Flink提供了很多各种场景用的WindowAssigner: 如果需要自己定制数据分发策略,则可以实现一个class,继承自 WindowAssigner。 evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter两个方法。 Flink 提供了如下三种通用的 evictor: * CountEvictor 保留指定数量的元素 * TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值。 * DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删除一个元素。 trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger, 如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义: * onElement() 每次往 window 增加一个元素的时候都会触发 * onEventTime() 当 event-time timer 被触发的时候会调用 * onProcessingTime() 当 processing-time timer 被触发的时候会调用 * onMerge() 对两个 `rigger 的 state 进行 merge 操作 * clear() window 销毁的时候被调用 上面的接口中前三个会返回一个 TriggerResult, TriggerResult 有如下几种可能的选择: * ConTINUE 不做任何事情 * FIRE 触发 window * PURGE 清空整个 window 的元素并销毁窗口 * FIRE_AND_PURGE 触发窗口,然后销毁窗口 需求 nc -lk 9999 有如下数据表示: 信号灯编号和通过该信号灯的车的数量 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 数据: 需求1:每5秒钟统计一次,最近5秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滚动窗口 代码实现: 运行结果: 需求2:每5秒钟统计一次,最近10秒钟内,各个路口通过红绿灯汽车的数量--基于时间的滑动窗口 代码实现: 运行结果: 分析运行结果: 滑动窗口:10s一个窗口,5s统计一次,如果连续输入了,结果回变多;如果不输入了,滑动窗口时,窗口里的数据会变少,所以10s内的数据就统计就会变少了。 需求: 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 需求1:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现5次进行统计--基于数量的滚动窗口 代码实现: 运行结果: 运行结果解释: 基于countWindow的滚动窗口,当keyBy后的结果达到代码中设置的滚动窗口大小,即相同的key达到size数量,就会触发sink(这里是直接将结果打印出来)。 需求2:统计在最近5条消息中,各自路口通过的汽车数量,相同的key每出现3次进行统计--基于数量的滑动窗口 代码实现: 运行结果: 需求: 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算 代码实现: 运行结果: 运行结果解析: 设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口得有数据!) 在Flink的流式处理中,会涉及到时间的不同概念,如下图所示: 事件时间EventTime: 事件真真正正发生产生的时间 摄入时间IngestionTime: 事件到达Flink的时间 处理时间ProcessingTime: 事件真正被处理/计算的时间 问题: 上面的三个时间,我们更关注哪一个? 答案: 更关注事件时间 ! 因为: 事件时间更能反映事件的本质! 只要事件时间一产生就不会变化 假设,你正在去往地下停车场的路上,并且打算用手机点一份外卖。选好了外卖后,你就用在线支付功能付款了,这个时候是11点59分。恰好这时,你走进了地下停车库,而这里并没有手机信号。因此外卖的在线支付并没有立刻成功,而支付系统一直在Retry重试“支付”这个操作。 当你找到自己的车并且开出地下停车场的时候,已经是12点01分了。这个时候手机重新有了信号,手机上的支付数据成功发到了外卖在线支付系统,支付完成。 在上面这个场景中你可以看到, 支付数据的事件时间是11点59分,而支付数据的处理时间是12点01分 问题: 如果要统计12之前的订单金额,那么这笔交易是否应被统计? 答案: 应该被统计,因为该数据的真真正正的产生时间为11点59分,即该数据的事件时间为11点59分, 事件时间能够真正反映/代表事件的本质! 所以一般在实际开发中会以事件时间作为计算标准 一条错误日志的内容为: 2020-11:11 22:59:00 error NullPointExcep --事件时间 进入Flink的时间为2020-11:11 23:00:00 --摄入时间 到达Window的时间为2020-11:11 23:00:10 --处理时间 问题: 对于业务来说,要统计1h内的故障日志个数,哪个时间是最有意义的? 答案: EventTime事件时间,因为bug真真正正产生的时间就是事件时间,只有事件时间才能真正反映/代表事件的本质! 某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。 A用户在 11:01:00 对 App 进行操作,B用户在 11:02:00 操作了 App, 但是A用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到B用户的消息,然后再接受到A用户的消息,消息乱序了。 问题: 如果这个是一个根据用户操作先后顺序,进行抢购的业务,那么是A用户成功还是B用户成功? 答案: 应该算A成功,因为A确实比B操作的早,但是实际中考虑到实现难度,可能直接按B成功算 也就是说,实际开发中希望基于事件时间来处理数据,但因为数据可能因为网络延迟等原因,出现了乱序,按照事件时间处理起来有难度! 在实际环境中,经常会出现,因为网络原因,数据有可能会延迟一会才到达Flink实时处理系统。我们先来设想一下下面这个场景: 原本应该被该窗口计算的数据因为网络延迟等原因晚到了,就有可能丢失了 实际开发中我们希望基于事件时间来处理数据,但因为数据可能因为网络延迟等原因,出现了乱序或延迟到达,那么可能处理的结果不是我们想要的甚至出现数据丢失的情况,所以需要一种机制来解决一定程度上的数据乱序或延迟到底的问题!也就是我们接下来要学习的Watermaker水印机制/水位线机制 Watermaker就是给数据再额外的加的一个时间列 也就是Watermaker是个时间戳! Watermaker = 数据的事件时间 - 最大允许的延迟时间或乱序时间 注意:后面通过源码会发现,准确来说: Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 这样可以保证Watermaker水位线会一直上升(变大),不会下降 之前的窗口都是按照系统时间来触发计算的,如: [10:00:00 ~ 10:00:10) 的窗口, 一但系统时间到了10:00:10就会触发计算,那么可能会导致延迟到达的数据丢失! 那么现在有了Watermaker,窗口就可以按照Watermaker来触发计算! 也就是说Watermaker是用来触发窗口计算的! 窗口计算的触发条件为: 因为前面说到 Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 也就是说只要不断有数据来,就可以保证Watermaker水位线是会一直上升/变大的,不会下降/减小的 所以最终一定是会触发窗口计算的 注意: 上面的触发公式进行如下变形: Watermaker >= 窗口的结束时间 Watermaker = 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 当前窗口的最大的事件时间 - 最大允许的延迟时间或乱序时间 >= 窗口的结束时间 当前窗口的最大的事件时间 >= 窗口的结束时间 + 最大允许的延迟时间或乱序时间 有订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额) 要求每隔5s,计算5秒内,每个用户的订单总金额 并添加Watermaker来解决一定程度上的数据延迟和数据乱序问题。 注意:一般我们都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor Generating Watermarks | Apache FlinkGenerating Watermarks # In this section you will learn about the APIs that Flink provides for working with event time timestamps and watermarks. For an introduction to event time, processing time, and ingestion time, please refer to the introduction to event time.Introduction to Watermark Strategies # In order to work with event time, Flink needs to know the events timestamps, meaning each element in the stream needs to have its event timestamp assigned.https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/代码实现: WaterMarker的源码: 自己实现Watermarker的代码:
2.3.2 WindowAssigner
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4
package ljxwtl.cn.window;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
public class FlinkStreamWindowWithTumblingProcessingTimeWindows {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream
package ljxwtl.cn.window;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class FlinkStreamWindowWithSlidingProcessingTimeWindows {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream
package ljxwtl.cn.window;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
public class FlinkStreamWindowWithCountTumblingProcessing {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream
package ljxwtl.cn.window;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
public class FlinkStreamWindowWithCountSlidingProcessing {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream
package ljxwtl.cn.window;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class FlinkStreamWindowWithSessionWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream
package ljxwtl.cn.watermarker;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.TemporalAccessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class FlinkStreamWindowWithTumblingEventTime {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream
3.4.4 代码实现-2-验证版
package org.apache.flink.api.common.eventtime;
import org.apache.flink.annotation.Public;
import java.time.Duration;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@Public
public class BoundedOutOfOrdernessWatermarks
package ljxwtl.cn.watermarker;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
public class FlinkStreamWindowWithTumblingEventTimeReal {
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream



