栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【大数据Flink】入门案例 -- WordCount

【大数据Flink】入门案例 -- WordCount

Flink-WordCount

下面主要是使用 DataSet 的方式去实现,在 Flink 1.14版本之后,DataSet 的方式被弃用,主要开始使用 DataStream 的方式

1. env 环境准备
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
2. Source 加载数据

用 , 分隔表示两行数据

DataSet lineDS = env.fromElements("Who's there?",
        "I think I hear them. Stand, ho! Who's there?");
3. transformation 数据转换处理

这一步是最关键的一部,大致经过4个步骤。

切割、标记、分组、聚合 3.1 切割

关键函数:flatMap,继承 FlatMapFunction,FlatMapFunction有两个参数(String 类型),分别代表输入和输出

DataSet words = lineDS.flatMap(new FlatMapFunction() {
    @Override
    public void flatMap(String value, Collector out) throws Exception {
        
        String[] arrStr = value.split(" ");     // 以空格当做分隔符
        for(String s:arrStr){
            out.collect(s);
        }
    }
});
3.2 标记

map方法的功能是标记,源代码为

public  MapOperator map(MapFunction mapper) {
        if (mapper == null) {
            throw new NullPointerException("Map function must not be null.");
        } else {
            String callLocation = Utils.getCallLocationName();
            TypeInformation resultType = TypeExtractor.getMapReturnTypes(mapper, this.getType(), callLocation, true);
            return new MapOperator(this, resultType, (MapFunction)this.clean(mapper), callLocation);
        }
    }

MapFunction 是一个接口,其中有唯一一个抽象方法,源代码为

public interface MapFunction extends Function, Serializable {
    O map(T var1) throws Exception;
}

Tuple2 表示二维元组,Tuple2 第一个参数为 String,第二个参数为 Integer。

下面用匿名函数的方式去实现map方法,完成数据标记的功能

// 数据标记为1,wordAndOne是被标记完成的数据,数据类型为二元组
// MapFunction 方法的功能为数据标记,输入String,输出Tuple2
DataSet> wordAndOne = words.map(new MapFunction>() {
    @Override
    public Tuple2 map(String s) throws Exception {
        // s 表示每一个单词。输入类型String,返回类型Tuple2
        return Tuple2.of(s,1);
    }
});
3.3 分组
// 分组 -- 对标记完成的数据进行分组。
// 0 表示对Tuple中的第一个参数进行分组
        UnsortedGrouping> grouped = wordAndOne.groupBy(0);
3.4 聚合
// 聚合 -- 1 表示对Tuple中的第二个参数进行聚合
        AggregateOperator> sum = grouped.sum(1);
4. Sink 数据输出
sum.print();

源码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class WordCount {

    public static void main(String[] args) {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 有两个参数,也就是两行数据
        // 父类类型 接收子类对象 是完全可以的
        DataSet lineDS = env.fromElements("Who's there?",
                "I think I hear them. Stand, ho! Who's there?");

        // 数据切割-匿名函数
        DataSet words = lineDS.flatMap(new FlatMapFunction() {
            @Override
            public void flatMap(String value, Collector out) throws Exception {
                
                String[] arrStr = value.split(" ");
                for(String s:arrStr){
                    out.collect(s);
                }
            }
        });

        // 数据标记为1,wordAndOne是被标记完成的数据
        // MapFunction 方法的功能为数据标记,输入String,输出Tuple2
        DataSet> wordAndOne = words.map(new MapFunction>() {
            @Override
            public Tuple2 map(String s) throws Exception {
                // s 表示每一个单词。输入类型String,返回类型Tuple2
                return Tuple2.of(s,1);
            }
        });

        // 分组 -- 对标记完成的数据进行分组。
        // 0 表示对Tuple中的第一个参数进行分组
        UnsortedGrouping> grouped = wordAndOne.groupBy(0);

        // 聚合 -- 1 表示对Tuple中的第二个参数进行聚合
        AggregateOperator> sum = grouped.sum(1);

        try {
            sum.print();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

依赖



    4.0.0

    org.example
    WordCount
    1.0-SNAPSHOT

    
        1.8
        1.8
    

    
        
            org.apache.flink
            flink-java
            1.13.1
            compile
        
        
            log4j
            log4j
            1.2.17
            runtime
        
        
            org.slf4j
            slf4j-log4j12
            1.7.33
            runtime
        
        
            org.apache.flink
            flink-clients_2.12
            1.13.1
            compile
        
        
            org.slf4j
            slf4j-api
            1.7.33
            compile
        
    
    
        
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.0.0
                
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    *:*
                                    
                                        meta-INF/*.SF
                                        meta-INF/*.DSA
                                        meta-INF/*.RSA
                                    
                                
                            
                            
                                
                                    
                                
                            
                        
                    
                
            
        
    


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/728672.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号