栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink学习之DataStream API(python版本)

Flink学习之DataStream API(python版本)

今天我们来学习flink中较为基础的DataStream API,DataStream API用来处理流数据。本文主要是以pyflink的形式来进行讲解,对往期内容感兴趣的小伙伴:

hadoop专题: hadoop系列文章.spark专题: spark系列文章.flink专题: Flink系列文章.

本博客的API都是python的,根据流数据处理的不同阶段,去官方的pyflink文档中寻找对应的python API 总结而成,如有遗漏的地方,请大家指正。

目录

1. 安装pyflink2. DataStream API

2.1 DataSources数据输入2.2 DataSteam转换操作2.3 DataSinks数据输出 3. DataSet4. 参考资料

1. 安装pyflink

Flink支持python3.6、3.7和3.8,同时Flink1.11以后也支持windows系统了,大家只要直接运行命令即可安装。

#安装命令
python3 -m pip install apache-flink -i https://pypi.tuna.tsinghua.edu.cn/simple/

我是在ubuntu中安装的,记得安装java8或11哦,出现如下界面即成功了。

2. DataStream API

DataStream API是Flink框架处理无界数据流的重要接口。前面提到,任何一个完整的Flink应用程序应该包含如下三个部分:

数据源(DataSource)。转换操作(Transformation)。数据汇(DataSink)。 2.1 DataSources数据输入

    从文件读取数据
env.read_text_file(file_path: str, charset_name: str = 'UTF-8')

    从集合Collection中读取数据
env.from_collection(collection: List[Any], type_info: pyflink.common.typeinfo.TypeInformation = None)
    自定义数据源
env.add_source(source_func: pyflink.datastream.functions.SourceFunction, source_name: str = 'Custom Source', type_info: pyflink.common.typeinfo.TypeInformation = None)
    还支持其他的数据源,上面几种较为常见。
2.2 DataSteam转换操作

当Flink应用程序生成数据源后,就需要根据业务需求,通过一系列转换操作对数据流上的元素进行各种计算,从而输出最终的结果。

    map

有时候,我们需要对数据流上的每个元素进行处理,比如将单个文本转换成一个元组,即1对1的转换操作,此时可以通过map转换操作完成。

datastreamsource.map(func, output_type) 
#Parameters
#func – The MapFunction that is called for each element of the DataStream.
#output_type – The type information of the MapFunction output data.
#Returns
#The transformed DataStream.
    flat_map

在某些情况下,需要对数据流中每个元素生成多个输出,即1对N的转换操作,那么此时可以利用flatMap操作。

datastreamsource.flat_map(func, output_type) 
#Parameters
#func – The FlatMapFunction that is called for each element of the DataStream.
#output_type – The type information of output data.
#Returns
#The transformed DataStream.
    fliter

有时要从数据流中筛选出符合预期的数据,那就需要对数据流进行过滤处理,即利用filter转换操作。

datastreamsource.filter(func) 
#Parameters
#func – The FilterFunction that is called for each element of the DataStream.
#Returns
#The filtered DataStream.
    key_by

针对不同的数据流元素,有时需要根据某些字段值,作为分区的Key来并行处理数据,此时就需要用到keyBy转换操作。它将一个DataStream类型的数据流转换成一个KeyedStream数据流类型

datastreamsource.key_by(key_selector,key_type) 
#Parameters
#key_selector – The KeySelector to be used for extracting the key for partitioning.
#key_type – The type information describing the key type.
#Returns
#The DataStream with partitioned state(i.e. KeyedStream).
    reduce

对于分区的数据流,对数据进行reduce处理,它实际上是一种聚合操作,将两个输入元素合并成一个输出元素。它是KeyedStream流上的操作

datastreamsource.reduce(func)
#Parameters
#func – The ReduceFunction that is called for each element of the DataStream.
#Returns
#The transformed DataStream.

例如:

ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b'])
ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1])
    union

在流操作场景中,有时需要合并多个流,即将多个数据流合并成一个数据流,此时可以使用union转换操作(最多合并3个)

#流1合并2,3
datastreamsource1.union(datastreamsource2,datastreamsource3) 
#Parameters
#datastreamsource – The DataStream to union outputwith.
#Returns The DataStream.

    connect

除了union可以合并流,还可以使用connect对2个数据流进行合并,且两个流的数据类型可以不相同。

datastreamsource.connect(ds)
#Parameters
#ds – The DataStream with which this stream will be connected.
#Returns
#The ConnectedStreams.
    project
#dataStreamSource.project(1, 0)方法从数据源dataStreamSource中筛选出2个字段,其字段索引分别是1和0,此时列也重新进行排序。
datastreamsource.project(*field_indexes: int) 
#Parameters
#field_indexes – The field indexes of the input tuples that are retained. The order of fields in the output tuple corresponds to the order of field indexes.
#Returns
#The projected DataStream.
    partition_custom

partition_custom转换操作可以根据自身需要,自行制定分区规则,partitionCustom只能对单个Key进行分区,不支持复合Key。

datastreamsource.partition_custom(partitioner, key_selector) 
#Parameters
#partitioner – The partitioner to assign partitions to keys.
#key_selector – The KeySelector with which the DataStream is partitioned.
#Returns
#The partitioned DataStream.

    window转换操作

Flink通过window机制,将无界数据流划分成多个有界的数据流,从而对有界数据流进行数据统计分析,window上还有多种转换操作,如max求窗口最大值,sum求窗口中元素和等。当窗口中的内置转换操作不能满足业务需求时,可以自定义内部的处理逻辑,即用apply方法传入一个自定义的WindowFunction

#CountWindow将datastream分成几个窗口
datastreamsource.CountWindow(id: int)
2.3 DataSinks数据输出

当数据流经过一系列的转换后,需要将计算结果进行输出,那么负责输出结果的算子称为Sink。

    sink_to
datastreamsource.sink_to(sink: pyflink.datastream.connectors.Sink) 
#Adds the given sink to this DataStream. only streams with sinks added will be executed once the execute() method is called.

#Parameters
#sink – The user defined sink.
#Returns
#The closed DataStream.
    add_sink
datastreamsource.add_sink(sink_func: pyflink.datastream.functions.SinkFunction) 
#Adds the given sink to this DataStream. only streams with sinks added will be executed once the StreamExecutionEnvironment.execute() method is called.

#Parameters
#sink_func – The SinkFunction object.
#Returns
#The closed DataStream.
3. DataSet

上面的部分,我们主要讲述了流处理DataStream的DataSource数据源、DataStream转换操作以及DataSink数据汇,在Flink中将批数据称为DataSet,关于批数据的处理总结如下:

数据源:和DataStream相似转换操作:参考spark的批处理api数据汇:和DataStream相似

DataSet在这里就不做过多讲述。

4. 参考资料

《PyDocs》(pyflink官方文档)
《Flink入门与实战》
《Kafka权威指南》
《Apache Flink 必知必会》
《Apache Flink 零基础入门》
《Flink 基础教程》

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741811.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号