栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Fink DataStream 常用API

Fink DataStream 常用API

Fink DataStream 常用API
  • 一、DataSource
  • 二、Transformation
  • 三、Sink

Flink DataStream 常用API主要分为3部分:

  • DataSource:是程序的数据源的输入,可以自定义数据源;
  • transformation:具体的操作,对一个或多个输入进行计算处理;
  • Sink:程序的输出,将Transformation处理之后的数据输出到指定的存储介质中;
一、DataSource

1、来源

DataSource方法描述
基于文件readTextFile读取文本文件,Exactly-once
基于SocketsocketTextStream从Socket中读取数据,元素可以通过一个分隔符分开,Exactly-once
基于集合fromCollection通过java的Collection集合创建一个数据流,集合中的所有元素必须是相同的类型 ,At-most-once
自定义输入addSource实现读取第三方数据源的数据,Exactly-once

2、自定义数据源的实现
有三种方式:
1)通过实现SourceFuntion接口来自定义无并行度(并行度为1)的数据源
2)通过实现ParallelSourceFunction接口
3)继承RichParallelSourceFunction来自定义有并行度的数据源
3、示例代码,

二、Transformation

Flink针对DataStream提供了大量的算子:

  1. Map ,输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作;
  2. FlatMap ,输入一个元素,可以返回零个、一个或多个元素;
  3. Filter ,过滤函数,对传入的数据进行判断,符合条件的数据保留;
  4. keyBy ,根据指定的Key进行分组,Key相同的数据会进入同一个分区;
  5. Reduce ,对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新值;
  6. Aggregations ,聚合函数,例如:sum(),min(),max()等;
  7. Union ,合并多个流,新的流会包含所有流中的数据,但是Union有一个限制,就是所有合并的流类型必须是一致的;
  8. Connect ,和Union类型,但是只能链接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法;
  9. coMap和coFlatMap ,在ConnectedStream中需要使用这两个函数,类似Map和FlatMap;
  10. Split ,根据规则把一个数据流切分为多个数据流;
  11. Select ,和Split配合使用,选择切分后的流;

Flink针对DataStream提供了一些数据分区规则:

  1. Random partition ,随机分区。DataStream.shuffle();
  2. Rebalancing ,对数据集进行再平衡、重分区和消除数据倾斜。DataStream.rebalance();
  3. Rescaling,重新调节。DataStream.rescale()。Rebalancing和Rescaling的区别Rebalancing会产生全量重分区,而Rescaling不会。
  4. Custom partitioning,自定义分区。实现Partitioner接口。
    DataStream.partitionCustom(partitioner,“someKey”);
    或者
    DataStream.partitionCustom(partitioner,0);
三、Sink

Flink针对DataStream提供大量的已经实现的数据目的地Sink:

  1. writeAsText() ,将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString方法来获取;
  2. print() / printToError() ,打印每个元素的toString() 方法的值到标准输出或者标准错误输出流中;
  3. 自定义输出,addSink可以实现把数据输出到第三方存储介质中。自定义Sink有两种实现方式:
    1). 实现SinkFunction接口
    2). 继承RichSinkFunction类

示例:使用RedisSink,来实现计算后的数据存放在redis中。源码

public class StreamingDemoToRedis {

    public static void main(String[] args) throws Exception {
        // 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取数据源
        DataStreamSource text = env.socketTextStream("node01",9109,"n");
        // 转换
        DataStream> words = text.map(new MapFunction>() {
            @Override
            public Tuple2 map(String value) throws Exception {
                return new Tuple2<>("words",value);
            }
        });
        // 创建redis配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
            .setHost("node03").setPort(6379).build();
        // 创建redisSink
        RedisSink> redisSink = new RedisSink<>(conf,new MyRedisMapper());
        // 打印出来
        words.print();
        // 添加sink
        words.addSink(redisSink);
        // 触发执行
        env.execute("流处理结果写入redis中");
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/671736.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号