栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

01-Flink 入门

01-Flink 入门

01-Flink 入门
  • 1. Flink 定义
    • Streams
    • State
    • Time
    • API
  • 2. Flink 快速上手
    • 创建 maven 工程
    • wordcount
      • 批处理
      • 流处理
  • 3.Flink DataStream 程序结构

1. Flink 定义

      Apache Flink is a framework and distributed process engine for stateful computations over unbounded and bounded data streams.

      Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。

Streams

      Flink 认为,一切都是由流组成的,离线数据是有界的流;实时数据是一个没有界限的流。

      Bounded streams: 有界流,有始有终的数据集合,数据集合的大小是有限定的。(1TB 的数据)

      Unbounded stream:无界流,有始无终的的数据流,数据的多少是不确定的。

State


      无状态:数据来一条处理一条,数据无需留在系统中。

      有状态:数据或数据的计算结果需要在系统中进行保留。

Time

       在我们的传统编码过程中,并没有对于时间做严格的区分,往往是数据来一条便处理一条,但此时所处理的数据并不是数据产生的真实时间。数据需要通过网络进行传输,如果网络时延较大,程序处理距离数据产生的真实时间也就有了较大的时差。

       Event Time:事件发生的时间。

       Ingestion Time: 事件进入 Flink 的时间。

       Process Time:Flink 开始处理事件的时间。

API

       Flink 中的 API 分为一下三类:

  • 越顶层越抽象,表达含义越简明,使用越方便。
  • 越底层越具体,表达能力越丰富,使用越灵活。

2. Flink 快速上手 创建 maven 工程


    4.0.0

    com.tencent
    FlinkTutorial
    1.0-SNAPSHOT

    
        
            org.apache.flink
            flink-java
            1.10.1
        

        
            org.apache.flink
            flink-streaming-java_2.12
            1.10.1
        
    

wordcount

       有如下一个需求,通过 Flink 统计单词出现的个数。

批处理

       hello.txt 文件内容如下:

hello flink
hello spark
hello datahub
hello java

       批处理代码如下:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

// 批处理 word count
public class WordCount {

    public static void main(String[] args) throws Exception {
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 从文件中读取数据
        String inputPath = "hello.txt";
        DataSet inputDataSet = env.readTextFile(inputPath);

        // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
        DataSet> resultSet = inputDataSet.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) {
                // 按空格分词
                String[] words = value.split(" ");
                // 遍历所有 word,包成二元组输出
                for (String word : words) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        })
        // 按照第一个位置的 word 进行分组
        .groupBy(0)
        // 将第二个位置上的数据求和
        .sum(1);

        resultSet.print();
    }

}

流处理

       流处理代码如下:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从文件中读取数据
        String inputPath = "hello.txt";
        DataStream inputDataStream = env.readTextFile(inputPath);

        // 基于数据流进行转换计算
        DataStream resultStream = inputDataStream.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) {
                // 按空格分词
                String[] words = value.split(" ");
                // 遍历所有 word,包成二元组输出
                for (String word : words) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        })
        // 按照第一个位置的 word 进行分组
        .keyBy(0)
        // 将第二个位置上的数据求和
        .sum(1);

        resultStream.print();

        // 执行任务
        env.execute();
    }
}

       可以看到,相较于批处理而言,流处理的输出在变化,每来一个单词,都在之前单词的基础上进行了一次重新计算。

       上述流处理的代码是从文件中读取数据,为了更加直观的体验流处理,我们通过 Socket 的方式读取内容,这样就更加符合流处理的场景了,不确定数据何时到以及数据的多少。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SocketWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 用 ParameterTool 工具从程序启动参数中提取配置项
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");

        DataStream inputDataStream = env.socketTextStream(host, port);

        // 基于数据流进行转换计算
        DataStream resultStream = inputDataStream.flatMap(new FlatMapFunction>() {
            public void flatMap(String value, Collector> out) throws Exception {
                // 按空格分词
                String[] words = value.split(" ");
                // 遍历所有 word,包成二元组输出
                for (String word : words) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        })
        // 按照第一个位置的 word 进行分组
        .keyBy(0)
        // 将第二个位置上的数据求和
        .sum(1);

        resultStream.print();

        // 执行任务
        env.execute();
    }

}

       在 IDE 中配置启动参数,如下:

       --host localhost --port 7777

       首先在终端中监听 7777 端口,然后启动程序。

       接着通过终端发送消息,观察输出。


       可以看到,已经有输出了,我们接着输入。


       程序已经成功输出了每个单词出现的次数。

       这里你可能有一个疑问,就是单词前面出现的数字是啥。这是因为我们知道 Flink 是分布式的,即有多个节点并行运行,这些数字是这节点的编号。

       我们可以通过 env.setParallelism(1); 将并行度设置为 1,这样就不会有多个节点运行。

       重新运行程序,可以看到不再有节点的标示数字了。

3.Flink DataStream 程序结构

       

       通过上述 Flink 的体验,可以得知 Flink DataStream 程序结构主要分为以下几个步骤:

1、设置运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、配置数据源读取数据

DataStream text = env.readTextFile("input");

3、进行一系列转换

DataStream> counts = text.flatMap(...).keyBy(0).sum(1);

4、配置数据汇写出数据

counts.writerAsText("output");

5、提交执行(一定别忘记执行,否则程序并不会执行)

env.execute("Streaming WordCount");
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389317.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号