栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

flink实战样例

flink实战样例

flink概念介绍

flink源码以及样例等:https://github.com/apache/flink
flink各个版本开发文档:https://nightlies.apache.org/flink/
一:dataStream实战 1:flink的StreamExecutionEnvironment

这StreamExecutionEnvironment是所有 Flink 程序的基础(创建批处理请使用ExecutionEnvironment)。创建一个执行环境,表示当前执行程序的上下文,类似于SparkContext.

1.1:StreamExecutionEnvironment配置运行环境参数

1:StreamExecutionEnvironment的API解释

 protected static void envSetConfig(  LocalStreamEnvironment env) {
        env.setBufferTimeout(1000);
        env.setMaxParallelism(10);//设置最大并行度
        env.setParallelism(8);//设置并行度

        //设置重启策略
        env.enableCheckpointing(5000);//设置重启策略必须先开启enableCheckpointing
        env.setRestartStrategy(new RestartStrategies.FailureRateRestartStrategyConfiguration(5, Time.milliseconds(100),Time.milliseconds(100)));

        //设置状态后端的存储位置。包括内存,文件系统等
        //1:内存
        env.setStateBackend(new MemoryStateBackend());
        //2:文件系统时必须指定checkpoint存储路径
        env.setStateBackend(new FsStateBackend("C:\Users\Administrator.SC-201905261418\Desktop\testData\flink"));

        //设置时间特性,用于窗口和水印使用
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    }
2:容错checkpoint

为了保证task任务或者算子执行过程中的失败能够恢复,启用检查点存储算子的执行状态(快照方式)。失败时从最新的快照进行恢复。
相关的配置参数如下:

env.enableCheckpointing(1000);
        // advanced options:
        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 设置checkpoint的最小间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 设置一次checkpoint完成的最长时间,超过取消此次checkpoint
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        //只允许一个检查点同时进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 启用作业取消后保留的外部化检查点
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
3:状态

在Java/Scala API上可以通过 stream.keyBy(…) 得到 KeyedStream,在Python API上可以通过 stream.key_by(…) 得到 KeyedStream。

3.1:Keyed State

所有支持的状态类型如下所示:

ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过 update(T) 进行更新,通过 T value() 进行检索。

ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterable get() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。

ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与 ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。

AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合。和 ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与 ListState 类似,但使用 add(IN) 添加的元素会用指定的 AggregateFunction 进行聚合。

MapState: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map) 添加映射。 使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。你还可以通过 isEmpty() 来判断是否包含任何键值对。

状态必须通过RichFunction函数才可以创建,创建StateDescriptor,才能得到对应的状态句柄。 这保存了状态名称、状态所持有值的类型。

使用示例:

  stream.keyBy(s -> s).flatMap(new RichFlatMapFunction() {
            //状态作为实例变量进行定义
            private ValueState vlaueSateTest;

            @Override //open方法只执行一次
            public void open(Configuration parameters) throws Exception {
                //声明状态:StateDescriptor 包含状态名称和有关状态所存值的类型

                //1:声明valuestate
                ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>(
                        "vlaueSateTest", //状态的名字。必须保证唯一性,后续通过name获取
                        Integer.class); //存储值的类型。
                //可以设置状态的过期时间等。
                //StateTtlConfig用于配置状态相关参数
                StateTtlConfig build = StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.minutes(1)).cleanupFullSnapshot().build();

                stateDescriptor.enableTimeToLive(build);

                //获取状态必须从getRuntimeContext进行get
                this.vlaueSateTest = getRuntimeContext().getState(stateDescriptor);
                //2:声明
                ListState listStateTest = getRuntimeContext().getListState(new ListStateDescriptor("listStateTest", ValueTypeInfo.INT_VALUE_TYPE_INFO));
            }

            @Override
            public void flatMap(Integer integer, Collector collector) throws Exception {
                vlaueSateTest.update(integer);//更新状态存的值
				//当满足某个条件时调用clear()清除状态。
            }

            @Override
            public void close() throws Exception {
            vlaueSateTest.clear();
            }
        }).print();
 
3.2:Operator State 

每个算子状态绑定到一个并行算子实例,作用范围限定为算子任务,同一并行任务的状态是共享的,并行处理的所有数据都可以访问到相同的状态。Kafka Connector就是使用算子状态的很好的一个例子,Kafka consumer的每个并行实例都维护一个主题分区和偏移,作为算子状态。当并行性发生变化时,算子状态接口支持在并行运算符实例之间重新分配状态。可以有不同的方案来进行这种再分配。

因为同一个并行任务处理的所有数据都可以访问到当前的状态,所以就相当于本地变量

3.3:状态存储方式

默认情况下,状态保存在TaskManagers的内存中,检查点存储在JobManager的内 存中。检查点的保存位置由配置的配置状态后台存储位置决定。
Flink 为 state 提供了三种开箱即用的后端存储方式(state backend):

Memory State Backend
File System (FS) State Backend
RocksDB State Backend

三种状态下的checkpoint存储位置如下:

设置状态后台

StreamExecutionEnvironment.setStateBackend(…)
1:MemoryStateBackend

MemoryStateBackend 将工作状态数据保存在 taskmanager 的 java 内存中。key/value 状态和 window 算子使用哈希表存储数值和触发器。进行快照时(checkpointing),生成的快照数据将和 checkpoint ACK 消息一起发送给 jobmanager,jobmanager 将收到的所有快照保存在 java 内存中。
MemoryStateBackend 现在被默认配置成异步的,这样避免阻塞主线程的 pipline 处理。
MemoryStateBackend 的状态存取的速度都非常快,但是不适合在生产环境中使用。

这是因为 MemoryStateBackend 有以下限制:

每个 state 的默认大小被限制为 5 MB(这个值可以通过 MemoryStateBackend 构造函数设置)
每个 task 的所有 state 数据 (一个 task 可能包含一个 pipline 中的多个 Operator) 大小不能超过 RPC 系统的帧大小(akka.framesize,默认 10MB)
jobmanager 收到的 state 数据总和不能超过 jobmanager 内存

MemoryStateBackend 适合的场景:
本地开发和调试
状态很小的作业

下图表示了状态存储位置

2:FsStateBackend

FsStateBackend 需要配置一个 checkpoint 路径,例如“hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”,我们一般配置为 hdfs 目录
FsStateBackend 将工作状态数据保存在 taskmanager 的 java 内存中。进行快照时,再将快照数据写入上面配置的路径,然后将写入的文件路径告知 jobmanager。jobmanager 中保存所有状态的元数据信息(在 HA 模式下,元数据会写入 checkpoint 目录)。

FsStateBackend 默认使用异步方式进行快照,防止阻塞主线程的 pipline 处理。可以通过 FsStateBackend 构造函数取消该模式:

new FsStateBackend(path, false);

FsStateBackend 适合的场景:
大状态、长窗口、大键值(键或者值很大)状态的作业
适合高可用方案 3:RocksDBStateBackend

RocksDBStateBackend 也需要配置一个 checkpoint 路径,例如:“hdfs://namenode:40010/flink/checkpoints” 或者 “file:///data/flink/checkpoints”,一般配置为 hdfs 路径。
RocksDB 是一种可嵌入的持久型的 key-value 存储引擎,提供 ACID 支持。由 Facebook 基于 levelDB 开发,使用 LSM 存储引擎,是内存和磁盘混合存储。
RocksDBStateBackend 将工作状态保存在 taskmanager 的 RocksDB 数据库中;checkpoint 时,RocksDB 中的所有数据会被传输到配置的文件目录,少量元数据信息保存在 jobmanager 内存中( HA 模式下,会保存在 checkpoint 目录)。
RocksDBStateBackend 使用异步方式进行快照。
RocksDBStateBackend 的限制:

由于 RocksDB 的 JNI bridge API 是基于 byte[] 的,RocksDBStateBackend 支持的每个 key 或者每个 value 的最大值不超过 2^31 bytes((2GB))。
要注意的是,有 merge 操作的状态(例如 ListState),可能会在运行过程中超过 2^31 bytes,导致程序失败。
RocksDBStateBackend 适用于以下场景:

超大状态、超长窗口(天)、大键值状态的作业
适合高可用模式
使用 RocksDBStateBackend 时,能够限制状态大小的是 taskmanager 磁盘空间(相对于 FsStateBackend 状态大小限制于 taskmanager 内存 )。这也导致 RocksDBStateBackend 的吞吐比其他两个要低一些。因为 RocksDB 的状态数据的读写都要经过反序列化/序列化。

RocksDBStateBackend 是目前三者中唯一支持增量 checkpoint 的。

3.4:查询状态 4:算子

transformation算子:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/operators/overview/

转换操作:如map等

map::采用一个数据元并生成一个数据元.如数据值*2
flatmap:采用一个数据元并生成零个,一个或多个数据元。如将句子分割为单词的 flatmap函数
Filter DataStream→DataStream 描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。

Window KeyedStream→WindowedStream 描述:可以在已经分区的KeyedStream上定义Windows。
		Windows根据某些特征 (例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。
		有关窗口的完 整说明,请参见windows。示例如下: // Last 5 seconds of data
		dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seco nds(5)));
		
WindowAll DataStream→AllWindowedStream 描述:Windows可以在常规DataStream上定义。
		Windows根据某些特征(例如, 在最后5秒内到达的数据)对所有流事件进行分组。
		有关窗口的完整说明,请参见 windows。警告:在许多情况下,这是非并行转换。
		所有记录将收集在windowAll 算子的一个任务中。
		dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5) ));

Window Apply WindowedStream→DataStream AllWindowedStream→DataStream 
		描述:将一般函数应用于整个窗口。
Window Reduce WindowedStream→DataStream 描述:将函数缩减函数应用于窗口并返回缩小的值。

Window Fold WindowedStream→DataStream
		算子描述:将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列
	   (1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:
广播 DataStream→DataStream 描述:向每个分区广播数据元。dataStream.broadcast()
4.1:window和WindowAll 的区别

window必须在keyby分区后的数据才能使用
WindowAll 对于dataStream可以直接使用,适用于所有数据。

5:时间和水印 5.1:时间特性 5.2:水印

水印就是衡量数据处理进度的一个时间戳。可以通过水印保证水印前数据的有序性。

每当元素进入到Flink,会根据其事件时间,生成一个新的时间戳,即水印。对于t时间的水印,意味着Flink不会再接收t之前的数据,那么t之前的数据就可以进行排序产出顺序流了。

对于水印中迟到数据的处理,flink允许对迟到数据的处理。

1:使用水印

stream.assignTimestampsAndWatermarks(..)
5:窗口

在流式数据中,数据是连续的,通常是无限的,对流中的所有元素进行计数是不可能的,所以在流上的聚合需要由window来划定范围,例如过去五分钟内用户浏览量的计算或者最后100个元素的和。window就是一种可以把无限数据切割为有限数据块的手段。

窗口可以由时间或者数量来做区分

1.根据时间进行截取,比如每10分钟统计一次,即时间驱动的[Time Window]2.根据消息数量进行统计,比如每100个数据统计一次,即数据驱动[Count Window]

按照窗口聚合的种类可以大致分为:

1:滚动窗口:没有数据重叠,比如统计每分钟的浏览量,TumblingEventTimeWindows.of(Time.minutes(1))2:滑动窗口:比如每10秒钟统计一次一分钟内的浏览量,SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))3:会话窗口:统计会话内的浏览量,会话的定义是同一个用户两次访问不超过30分钟,EventTimeSessionWindows.withGap(Time.minutes(30))4:全局窗口:global window + trigger 一起配合才能使用

窗口的时间可以通过下面的几种时间单位来定义:

毫秒,Time.milliseconds(n)
秒,Time.seconds(n)
分钟,Time.minutes(n)
小时,Time.hours(n)
天,Time.days(n)

示例

//滚动窗口 stream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(2)))
//滑动窗口 stream.keyBy(0).window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(4)))
//会话窗口 stream.keyBy(0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
//全局窗口 stream.keyBy(0).window(GlobalWindows.create()) //如果不加这个程序是启动不起来的               
						.trigger(CountTrigger.of(3)) .sum(1) .print();
二:dataSet实战

由ExecutionEnvironment.ge tExecutionEnvironment()获取上下文环境进行加载数据
wordcount示例:

  public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet text = env.fromElements("Who's there?", "I think I hear them. Stand, ho! Who's there?");
        DataSet> wordCounts = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
        wordCounts.print();

    }

    public static class LineSplitter implements FlatMapFunction> {
        @Override
        public void flatMap(String line, Collector> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2(word, 1));
            }
        }
    }
2:问题记录 1:Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/flink/streaming/api相关类

明明flink的相关依赖已经加到pom文件中,依赖也已经导入,但是执行main函数还是报相关的类没有。
解决:
在项目的pom.xml中注释掉
provided 去掉即可。注释为

2:窗口函数使用报:Is the time characteristic set to ‘ProcessingTime’, or did you forget to call ‘DataStream.assignTimestampsAndWatermarks(…)’?

1:默认时间特性为处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
使用处理时间时调用窗口函数Processing相关的窗口函数即可解决,比如

 WindowedStream, Integer, TimeWindow> window = map.keyBy(obj -> obj.f1).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

2:设置时间特性为事件时间,并且定义水印
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Dstream.assignTimestampsAndWatermarks
才可以使用TumblingEventTimeWindows函数

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号