栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Flink 典型测试用例 WordCount 之实时流处理和批处理实验(二)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Flink 典型测试用例 WordCount 之实时流处理和批处理实验(二)

批处理实验(DataSet API)

TestWordCount.java

public class TestWordCount {
    public static void main(String[] args) throws Exception {
        // 获取执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 读取文件
        String inputPath = "E:\Projects\bigdata\flink\flink-study\demo01\src\main\resources\textfile.txt";
        DataSource dataSource = env.readTextFile(inputPath);
        // 处理数据
        DataSet> wordCount = dataSource
                .flatMap(new LineToWordOne())
                .groupBy(0)
                .sum(1);
        // output
        wordCount.print();
    }

    // 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
    public static class LineToWordOne implements FlatMapFunction> {

        @Override
        public void flatMap(String line, Collector> out) throws Exception {
            String[] split = line.split(" ");
            for (String wold: split){
                out.collect(new Tuple2(wold,1));
            }
        }
    }
}
流处理实验 (DataStream API) 有界流处理

将文件中的数据读取成数据流,然后提交任务到 Flink 执行,以此来模拟有界流数据处理。

StreamWordCount .java

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 获取启动参数
        ParameterTool parameters = ParameterTool.fromArgs(args);
        boolean isNetcat = parameters.getBoolean("isNetcat");
		
        // 获取流处理环境
        StreamExecutionEnvironment streamEnv 
        	= StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        streamEnv.setParallelism(3);

		// 读取数据流
        String filePath = parameters.get("filepath");
        DataStreamSource dataSource = streamEnv.readTextFile(filePath);

        // 处理流数据
        SingleOutputStreamOperator> resultStream 
        	= dataSource.flatMap(new LineToWordOne())
                .keyBy(0)
                .sum(1);

        resultStream.print();

        
        streamEnv.execute();
    }

    // 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
    public static class LineToWordOne implements FlatMapFunction> {

        @Override
        public void flatMap(String line, Collector> out) throws Exception {
            String[] split = line.split(" ");
            for (String wold: split){
                out.collect(new Tuple2(wold,1));
            }
        }
    }
}

对比批处理的代码逻辑可以看出,流处理需要先定义好一个数据处理的流程,然后将任务提交到 Flink 中执行,数据是一个一个实时处理的,即使是有界流,也是根据定义好的逻辑读取数据流,然后攒到界限后一个一个处理,而批处理中,使用 DataSetAPI 数据先汇聚到一个数据集再分组计算:

无界流数据处理实验

修改 StreamWordCount.java 类,让它可选择的处理有界流或者无界流

package org.flink.study.practice01;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 获取启动参数
        ParameterTool parameters = ParameterTool.fromArgs(args);
        boolean isNetcat = parameters.getBoolean("isNetcat");

        // 获取流处理环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        streamEnv.setParallelism(3);

        DataStreamSource dataSource;
        if (!isNetcat ){
            System.out.println("测试 1:从文件中读取文件流,其实还是一个有界流");
            String filePath = parameters.get("filepath");
            dataSource = streamEnv.readTextFile(filePath);
        } else {
            System.out.println("测试 2:从中 socket 中读取流,无界流,实时处理");
            String host = parameters.get("host");
            int port = parameters.getInt("port");
            dataSource = streamEnv.socketTextStream(host,port);
        }

        // 处理流数据
        SingleOutputStreamOperator> resultStream = dataSource.flatMap(new LineToWordOne())
                .keyBy(0)
                .sum(1);

        resultStream.print();

        //执行任务
        streamEnv.execute();
    }

    // 处理数据:每一条记录用空格隔开,每个单词组成一个二元组(word , 1)
    public static class LineToWordOne implements FlatMapFunction> {

        @Override
        public void flatMap(String line, Collector> out) throws Exception {
            String[] split = line.split(" ");
            for (String wold: split){
                out.collect(new Tuple2(wold,1));
            }
        }
    }
}

idea开发环境启动参数,使用无界流数据:

在虚拟机上开启一个 socket 监听 9999 端口,用来发数据:

nc -lk 9999

启动应用程序,向 Socket 中实时输入数据

输入数据:

程序控制台实时输出:

不漏算测试

关闭程序,在 netcat 开启的 socket 中继续追加输入,然后启动程序,发现程序会从之前的位置继续处理数据:


可以看到数据不会漏算,如果要实现故障恢复,还需要将状态定期持久化,并设置检查点。

实验代码地址

git地址:https://github.com/hubo-admin/flink-study

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/346423.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号