栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink Java 使用map reduce实现wordcount

Flink Java 使用map reduce实现wordcount

整体思路
  1. 首先数据源是流式读取文件内容
  2. 对每行句子按照空格切分
  3. 将每个单词都构造为一个Tuple,第一个位置是单词,第二个位置是词频
  4. 按照key(单词)分组,对每个组做聚合(reduce)操作
  5. 将结果输出
文本文件

在maven项目的resources下新建一个文件hello.txt,内容如下:

hello world
hello flink
hello spark
When we have shuffled off this mortal coil
When we have shuffled off this mortal coil
ack
代码如下
package transform;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountKeyBy {
    public static void main(String[] args) throws Exception {
        // 1.创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.从文件中读取数据
        DataStream dataStream = env.readTextFile("src/main/resources/hello.txt");
        // 执行环境并行度设置3
        env.setParallelism(3);

        // 3.按照空格分词
        DataStream> sensorStream = dataStream.flatMap(new FlatMapFunction>() {
            @Override
            public void flatMap(String value, Collector> out) throws Exception {
                String[] wordString = value.split(" ");
                for (String wordLine : wordString) {
                    out.collect(new Tuple2<>(wordLine, 1));
                }
            }
        });
        // 4.分组
        KeyedStream, Object> key = sensorStream.keyBy(tuple -> tuple.f0);
        // 5.聚合
        SingleOutputStreamOperator> resultStream = key.sum(1);
        resultStream.print();
        //执行
        env.execute();
    }
}

附:pom.xml


    4.0.0

    org.myorg.quickstart
    quickstart
    0.1
    jar

    Flink Quickstart Job

    
        UTF-8
        1.13.2
        1.8
        2.11
        ${target.java.version}
        ${target.java.version}
        2.12.1
    

    
        
            apache.snapshots
            Apache Development Snapshot Repository
            https://repository.apache.org/content/repositories/snapshots/
            
                false
            
            
                true
            
        
    

    
        
        
        
            org.apache.flink
            flink-java
            ${flink.version}
            compile
        
        
            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}
            compile
        
        
            org.apache.flink
            flink-clients_${scala.binary.version}
            ${flink.version}
            compile
        

        

        

        
        
        
            org.apache.logging.log4j
            log4j-slf4j-impl
            ${log4j.version}
            runtime
        
        
            org.apache.logging.log4j
            log4j-api
            ${log4j.version}
            runtime
        
        
            org.apache.logging.log4j
            log4j-core
            ${log4j.version}
            runtime
        
    

    
        

            
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.1
                
                    ${target.java.version}
                    ${target.java.version}
                
            

            
            
            
                org.apache.maven.plugins
                maven-shade-plugin
                3.1.1
                
                    
                    
                        package
                        
                            shade
                        
                        
                            
                                
                                    org.apache.flink:force-shading
                                    com.google.code.findbugs:jsr305
                                    org.slf4j:*
                                    org.apache.logging.log4j:*
                                
                            
                            
                                
                                    
                                    *:*
                                    
                                        meta-INF/*.SF
                                        meta-INF/*.DSA
                                        meta-INF/*.RSA
                                    
                                
                            
                            
                                
                                    org.myorg.quickstart.StreamingJob
                                
                            
                        
                    
                
            
        

        
            

                
                
                    org.eclipse.m2e
                    lifecycle-mapping
                    1.0.0
                    
                        
                            
                                
                                    
                                        org.apache.maven.plugins
                                        maven-shade-plugin
                                        [3.1.1,)
                                        
                                            shade
                                        
                                    
                                    
                                        
                                    
                                
                                
                                    
                                        org.apache.maven.plugins
                                        maven-compiler-plugin
                                        [3.1,)
                                        
                                            testCompile
                                            compile
                                        
                                    
                                    
                                        
                                    
                                
                            
                        
                    
                
            
        
    

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/279918.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号