栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink -没写完更新中

Flink -没写完更新中

文章目录

前言一、概述

1 Flink是什么2 架构分层3 基本组件4 其他流式计算框架+ 二、入门与使用

1 Flink基本安装

1.1 Linux1.2 Java1.3 Scala+ 2 常用API

2.1 DataStream 流处理

DataSourceTransformationSink示例一:自定义数据源(SourceFunction)示例二:自定义分区示例三:NettyServer作为数据源示例四:RabbitMQ作为数据源示例五:待定 2.2 DataSet 批处理2.3 Table API/SQL2.4 关于序列化 3 集群模式 三、高阶功能使用四、原理解析总结

前言

提示:这里可以添加本文要记录的大概内容,例如:我是一个帅哥,你懂吧?

文献
《Flink入门与实战》 - 徐葳
/
一、概述 1 Flink是什么

​ Apache Flink,内部是用Java及Scala编写的分布式流数据计算引擎,可以支持以批处理或流处理的方式处理数据,在2014年这个项目被Apache孵化器所接受后,Flink迅速成为ASF(ApacheSoftware Foundation)的顶级项目之一,在2019年1月,阿里巴巴集团收购了Flink创始公司(DataArtisans),打造了阿里云商业化的实时计算Flink产品。

它有如下几个特点

    低延迟高吞吐支持有界数据/无界数据的处理,数据流式计算支持集群,支持HA,可靠性强

什么是有界数据/无界数据?

有界数据:数据是有限的,一条SELECt查询下的数据不会是源源不断的无界数据:数据源源不断,不知道为什么时候结束,例如监控下的告警

2 架构分层
名称描述
Deploy 部署方式本地/集群/云服务部署。
Core 分布式流处理模型计算核心实现,为API层提供基础服务。
API 调用接口提供面向无界数据的流处理API及有界数据的批处理API,其中流处理对应DataStream API,批处理对应DataSet API。
Library 应用层提供应用计算框架,面向流处理支持CEP(复杂事件处理)、基于SQL-like的操作(基于Table的关系操作),面向批处理支持FlinkML(机器学习库)、Gelly(图处理)、Table 操作。
3 基本组件

一个Flink任务 = DataSource + Transformation + DataSink

DataSource :数据源

Transformation :数据处理

DataSink:计算结果输出

而Flink在网络传输中通过缓存块承载数据,可以通过设置缓存块的超时时间,变相的决定了数据在网络中的处理方式。

4 其他流式计算框架+

1

1

1

1

1

二、入门与使用 1 Flink基本安装 1.1 Linux
下载链接
Index of /dist/flink/flink-1.14.3 (apache.org)

首先去apache官网下载部署的软件包,下载完成之后进行解压

## 解压
tar -zxvf flink-1.14.3-bin-scala_2.12.tgz 
## 进入bin目录 启动
./start-cluster.sh
## Flink提供的WebUI的端口是8081 此时可以去看看是否启动完成
netstat -anp |grep 8081

接着通过页面访问8081端口来个初体验

关于Linux下的Flink Shell终端的使用

文章目录
flink~使用shell终端_cai_and_luo的博客-CSDN博客
1.2 Java
文章目录
Flink入门之Flink程序开发步骤(java语言)_胖虎儿的博客-CSDN博客

导入依赖


    org.apache.flink
    flink-java
    1.14.3



    org.apache.flink
    flink-streaming-java_2.12
    1.14.3



    org.apache.flink
    flink-clients_2.12
    1.14.3



    org.apache.flink
    flink-core
    1.14.3

入门Demo

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class DemoApplication {
    public static void main(String[] args) throws Exception {

        

        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.加载数据源
        DataStreamSource elementsSource = env.fromElements("java,scala,php,c++",
                "java,scala,php", "java,scala", "java");
        // 3.数据转换
        DataStream flatMap = elementsSource.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String element, Collector out) throws Exception {
                String[] wordArr = element.split(",");
                for (String word : wordArr) {
                    out.collect(word);
                }
            }
        });
        // DataStream 下边为DataStream子类
        SingleOutputStreamOperator source = flatMap.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                return value.toUpperCase();
            }
        });
        // 4.数据输出
        source.print();
        // 5.执行程序
        env.execute();
    }
}

关于在设置运行模式的代码上,有三种选择

@PublicEvolving
public enum RuntimeExecutionMode {

    
     
    STREAMING,

    
     
    BATCH,

    
    
    AUTOMATIC
}
1.3 Scala+

与Java一样都在IDEA编译器上做,此时引入依赖


    org.apache.flink
    flink-scala_2.12
    1.14.3



    org.apache.flink
    flink-streaming-scala_2.12
    1.14.3



    org.apache.flink
    flink-clients_2.12
    1.14.3



    org.apache.flink
    flink-core
    1.14.3

// …

待定 …

// …

2 常用API

第一次学时,光看上面的Demo例子比较难以理解,所以通过书下面的API内容对照上面的Demo来进行理解,先来了解Flink四种层次的API详情

层级描述信息备注
底层 API偏底层,易用性比较差,提供时间/状态的细粒度控制Stateful Stream Processing
核心 API对有界/无界数据提供处理方法DataStream(流处理) / DataSet(批处理)
Table API/声明式DSL
SQL/高级语言

2.1 DataStream 流处理

主要分为三个流程

    DataSource 数据输入:addSource(sourceFunction)为程序添加一个数据源。Transformation 数据处理:对一个或多个数据源进行操作。Sink 数据输出:通过Transformation 处理后的数据输出到指定的位置。

DataSource

看看他们的API

DataSource API描述
readTextFile(文件路径)逐行读取文本文件的数据
socketTextStream(地址信息)从socket中读取数据
fromCollection(集合数据)从集合内获取数据
其他第三方输入数据…或者自定义数据源通过Flink提供的内置连接器去链接其它数据源

如果是自定义数据源,有两种实现方式

    实现SourceFunction接口(并行度为1 = 无并行度)实现ParallelSourceFunction接口 / 继承RichParallelSourceFunction

什么是并行度?

​ 一个Flink程序由多个任务(Source、Transformation和Sink)组成。一个任务由多个并行实例(线程)来执行,一个任务的并行实例(线程)数目被称为该任务的并行度。

Transformation

接下来是Transformation数据处理,Flink针对DataStream提供了大量的已经实现的算子。

DataStream API描述
Map输入一个元素,然后返回一个元素,中间可以进行清洗转换等操作
FlatMap输入一个元素,可以返回零个、一个或者多个元素
Filter过滤函数,对传入的数据进行判断,符合条件的数据会被留下
KeyBy根据指定的Key进行分组,Key相同的数据会进入同一个分区,典型用法如下:1、DataStream.keyBy(“someKey”) 指定对象中的someKey段作为分组Key。2、DataStream.keyBy(0) 指定Tuple中的第一个元素作为分组Key。
Reduce对数据进行聚合操作,结合当前元素和上一次Reduce返回的值进行聚合操作,然后返回一个新的值
Aggregationssum()、min()、max()等
Union合并多个流,新的流会包含所有流中的数据,但是Union有一个限制,就是所有合并的流类型必须是一致的
Connect和Union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法
coMap和coFlatMap在ConnectedStream中需要使用这种函数,类似于Map和flatMap
Split根据规则把一个数据流切分为多个流
Select和Split配合使用,选择切分后的流

关于Flink针对DataStream提供的一些数据分区规则

分区规则描述
DataStream.shuffle()随机分区
DataStream.rebalance()对数据集进行再平衡、重分区和消除数据倾斜
DataStream.rescale()重新调节
DataStream.partitionCustom(partitioner,0) 或者 DataStream.partitionCustom(partitioner,“smeKey”)自定义分区
Sink

数据处理后的输出

Sink API描述
writeAsText()将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
print() / printToErr()打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
自定义输出addSink可以实现把数据输出到第三方存储介质中。系统提供了一批内置的Connector,它们会提供对应的Sink支持
示例一:自定义数据源(SourceFunction)

第一步,继承SourceFunction接口,实现自定义数据源类

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;


public class DemoTransactionSource implements SourceFunction {
    @Override
    public void run(SourceContext ctx) throws Exception {
        while (true) {
            // 发射元素
            ctx.collect(String.valueOf(new Random().nextInt(50)
            ));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
    }
}

第二步,在Flink代码中引入这个数据源

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;


@Component
public class FlinkInitialize {

    @PostConstruct
    public void starter() throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.设置自定义数据源
        DataStreamSource stringDataStreamSource = env.addSource(new DemoTransactionSource(), "测试用的数据源");

        // 3.数据处理
        SingleOutputStreamOperator stringSingleOutputStreamOperator = stringDataStreamSource.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        });

        // 4.数据输出
        stringSingleOutputStreamOperator.print();

        // 5.执行程序
        env.execute();
    }
}

此时执行代码,就可以把引入的数据进行打印

SourceFunction定义了run和cancel两个方法和SourceContext内部接口。

run(SourceContex):实现数据获取逻辑,并可以通过传入的参数ctx进行向下游节点的数据转发。cancel():用来取消数据源,一般在run方法中,会存在一个循环来持续产生数据,cancel方法则可以使该循环终止。SourceContext:source函数用于发出元素和可能的watermark的接口,返回source生成的元素的类型。

示例二:自定义分区

数据源沿用上述案例的代码,自定义分区是通过实现Partitioner接口去做处理

首先看看自定义分区的实现类

public class DemoPartitioner implements Partitioner {
    @Override
    public int partition(String key, int numPartitions) {
        System.out.println("目前分区总数=" + numPartitions + "  当前值=" + key + "  通过最左边的值看分区号");

        if (new Integer(key) > 20) {
            return 1;
        } else {
            return 2;
        }
    }
}

然后在Flink的代码中体现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Component
public class FlinkInitialize {
    @PostConstruct
    public void starter() throws Exception {
        // 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置运行模式
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 2.设置自定义数据源
        DataStreamSource stringDataStreamSource = env.addSource(new DemoTransactionSource(), "测试用的数据源");
        // 3.数据处理
        DataStream dataStream = stringDataStreamSource.map(new MapFunction() {
            @Override
            public String map(String value) throws Exception {
                return value;
            }
        }).partitionCustom(new DemoPartitioner(), new KeySelector() {
            @Override
            public String getKey(String value) throws Exception {
                return value;
            }
        });
        // 4.数据输出
        dataStream.print();
        // 5.执行程序
        env.execute();
    }
}

输出后的结果如下

示例三:NettyServer作为数据源

第一步:搭建数据来源,这里选择了Netty服务端作为本次示例

import com.sun.org.slf4j.internal.Logger;
import com.sun.org.slf4j.internal.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;


@Component
public class NettyServerInitialize {

    private static Logger log = LoggerFactory.getLogger(NettyServerInitialize.class);
    public static EventLoopGroup bossGroup;
    public static EventLoopGroup workerGroup;
    public static ServerBootstrap serverBootstrap;
    public static ChannelFuture channelFuture;
    public static Boolean isRunning = false;

    static {
        // Server初始化
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup(2);
        serverBootstrap =
                new ServerBootstrap().group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                System.out.println("客户端进入:" + ch.remoteAddress().getAddress());
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new NettyServerHandler());
                                pipeline.addLast(new StringEncoder());
                            }
                        });
    }

    @PostConstruct
    public void starter() throws InterruptedException {
        try {
            if (!isRunning) {
                channelFuture = serverBootstrap.bind(16668)
                        .addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (future.isSuccess()) {
                                    System.out.println("监听端口 16668 成功");
                                    isRunning = true;
                                } else {
                                    log.error("监听端口 16668 失败");
                                }
                            }
                        }).channel().closeFuture().sync();
            }
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

第二步:XXXX

示例四:RabbitMQ作为数据源

1

示例五:待定

1

2.2 DataSet 批处理

1

2.3 Table API/SQL

1

1

2.4 关于序列化

1

111111

3 集群模式

1

1

1

三、高阶功能使用

1

四、原理解析

1

总结

提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/747512.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号