栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink的Java Api 实现WordCount的批处理和流处理

Flink的Java Api 实现WordCount的批处理和流处理

1 基础说明

Flink 是一款优秀的批处理和流处理的大数据计算引擎,本文将通过Flink的Java Api实现WordCount多版本案例。更多请查阅 Flink官网
说明:

    Flink版本:1.13.5Flink Web UI地址:http://192.168.18.88:7999服务器具备nc(netcat)环境,如果不具备,可在服务器执行安装命令
yum -y install netcat

2 WordCount案例 2.1 项目目录结构

创建maven项目,目录结构如下:

2.2 maven依赖

依赖内容如下:


  
    org.apache.flink
    flink-streaming-java_2.12
    1.14.4
  
  
    org.apache.flink
    flink-java
    1.14.4
  
  
    org.apache.flink
    flink-clients_2.12
    1.14.4
  

完整的pom.xml文件如下:




    4.0.0

    cn.mfox
    wordcount-demo
    1.0-SNAPSHOT

    wordcount-demo
    
    http://www.example.com

    
        UTF-8
        1.8
        1.8
    
    
        
            org.apache.flink
            flink-streaming-java_2.12
            1.14.4
        
        
            org.apache.flink
            flink-java
            1.14.4
        
        
            org.apache.flink
            flink-clients_2.12
            1.14.4
        
    
    
    
        
            
                maven-compiler-plugin
                3.6.0
                
                    1.8
                    1.8
                
            
        
    

2.3 创建word.txt文件

创建src/main/java/cn/mfox/word.txt文件,word.txt文件的具体内容如下:

洪七公 王重阳 欧阳锋 黄药师
洪七公
周伯通
郭靖
黄药师
黄蓉
林朝英

word.txt文件位置及内容截图如下:

2.4 批处理WordCount

WordCountByBatch.java代码如下:

package cn.mfox;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCountByBatch {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 从文件中读取文件
        String inputPath = "src/main/java/cn/mfox/word.txt";
        DataSet inputDataSet = env.readTextFile(inputPath);
        // 空格分词打散之后,对单词进行groupby分组,然后用sum进行聚合
        // 对数据集进行处理,按空格分词展开,转换成(word,1)二元组进行统计
        // groupBy(0):按照第一个位置的word分组
        // sum(1):将第二个位置上的数据求和
        DataSet> wordCountDataSet = inputDataSet.flatMap(new MyFlatMapper())
                .groupBy(0)
                .sum(1);
        // 打印输出
        wordCountDataSet.print();
    }

    
    private static class MyFlatMapper implements FlatMapFunction> {
        
        @Override
        public void flatMap(String value, Collector> out) throws Exception {
            // 分词
            String[] words = value.split(" ");
            // 遍历所有word,包装成二元组输出
            for (String word : words) {
                // 排除空格
                word = word.trim();
                if (!word.isEmpty()) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        }
    }
}

程序运行结果如下:

2.5 流处理WordCount

WordCountByStream.java代码如下:

package cn.mfox;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class WordCountBySocketStream {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取ParameterTool
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");
        // 从socket文本流读取数据
        DataStream inputDataStream = env.socketTextStream(host, port);
        // 数据流转换操作
        SingleOutputStreamOperator> outputStreamOperator = inputDataStream
        .flatMap(new MyFlatMapper())
        .keyBy(value -> value.f0)
        .sum(1);
        outputStreamOperator.print();
        // 启动任务
        env.execute();
    }

    
    private static class MyFlatMapper implements FlatMapFunction> {
        
        @Override
        public void flatMap(String value, Collector> out) throws Exception {
            // 分词
            String[] words = value.split(" ");
            // 遍历所有word,包装成二元组输出
            for (String word : words) {
                // 排除空格
                word = word.trim();
                if (!word.isEmpty()) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        }
    }
}

程序运行结果如下:

2.6 Socket流处理WordCount
    shell窗口中创建socket端口,命令如下
nc -lk 18888
    socket端口中输入测试数据,截图如下:
    编写 WordCountBySocketStream.java 文件,代码如下:
package cn.mfox;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class WordCountBySocketStream {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 获取ParameterTool
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String host = parameterTool.get("host");
        int port = parameterTool.getInt("port");
        // 从socket文本流读取数据
        DataStream inputDataStream = env.socketTextStream(host, port);
        // 数据流转换操作
        SingleOutputStreamOperator> outputStreamOperator = inputDataStream
        .flatMap(new MyFlatMapper())
        .keyBy(value -> value.f0)
        .sum(1);
        outputStreamOperator.print();
        // 启动任务
        env.execute();
    }

    
    private static class MyFlatMapper implements FlatMapFunction> {
        
        @Override
        public void flatMap(String value, Collector> out) throws Exception {
            // 分词
            String[] words = value.split(" ");
            // 遍历所有word,包装成二元组输出
            for (String word : words) {
                // 排除空格
                word = word.trim();
                if (!word.isEmpty()) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        }
    }
}
    在当前工程目录下执行maven打包命令
mvn clean package

流程截图如下:

    在Flink的Web界面(Submit New Job)菜单中上传Jar包,上传成功后的截图如下:
    界面点击刚上传的jar包,输入要运行的类、socket的地址、端口,输入的内容如下:
cn.mfox.WordCountBySocketStream
--host 192.168.18.88 --port 18888

截图如下:

    在Running Jobs菜单中可查看刚提交的Flink程序的执行情况,如各个算子的并行度,接受数据的条数大小等,截图如下:
    在Task Manages菜单下可以查看Flink程序的日志输出,截图如下:

3 结束语

至此,文档篇幅结束…
因初次接触Flink及个人水平有限,如有错误,欢迎各位大佬点评 !

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/784914.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号