栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop | MapReduce学习笔记(一)使用Java实现MapReduce编程 | 打包词频统计程序为jar包提交到Hadoop集群并运行 | Mapper、Reducer、Driver

Hadoop | MapReduce学习笔记(一)使用Java实现MapReduce编程 | 打包词频统计程序为jar包提交到Hadoop集群并运行 | Mapper、Reducer、Driver

文章目录

参考资料运行环境一、Java MapReduce 编程规范

1.1 Mapper阶段1.2 Reduce阶段1.3 Driver阶段 二、WordCount 案例

2.1 Mapper2.2 Reducer2.3 Driver2.4 DeBug调试 三、MR打包提交到Hadoop集群

参考资料

视频链接

运行环境

windows10CentOS7虚拟机JDK8Hadoop3.1.3 一、Java MapReduce 编程规范


用于编写的程序分为三个部分:Mapper、Reducer和Driver

1.1 Mapper阶段
    用户自定义的Mapper需继承自己的父类Mapper的输入数据是KV对的形式(KV类型可自定义)Mapper中的业务逻辑卸载map() 方法中Maper的输出数据是KV对的形式(KV类型可自定义)map()方法(MapTask进程)对每一个调用一次
1.2 Reduce阶段
    用户自定义的Reducer要继承自己的父类Reducer的输入数据类型赌赢Mapper的输出数据类型,也是KV、Reducer的业务逻辑写在reduce()方法中ReduceTask进程对每一组相同k的 组调用一次reduce()方法
1.3 Driver阶段

相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的Job对象。

二、WordCount 案例

需求:在给定的文本文件中统计输出每一个单词出现的总次数

测试数据:

hello wolrd
hello spark
my name is uni
hadoop is nice
hadoop

预期结果:

hello 2
hadoop 2
is 2
my 1
name 1
nice 1
spark 1
uni 1
world 1
2.1 Mapper
    将MapTask传给文本内容,先转换成String根据空格将这一行切分成单词将单词转化为<单词, 1> 的形式
    WordCountMapper.java
package com.uni.mr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper {
    // 放在上面声明防止在循环里多次创建对象,浪费空间
    private Text outKey = new Text();
    private IntWritable outValue = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 获取一行
        String line = value.toString();
        // 2. 切割
        String[] words = line.split(" ");
        // 3. 循环写出
        for (String word : words) {
            // 封装 outKey
            outKey.set(word);
            context.write(outKey, outValue);
        }
    }
}

2.2 Reducer
    汇总各个key的个数输出该key的总次数

WordCountReducer.java

package com.uni.mr.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class WordCountReducer extends Reducer {
    private  IntWritable outValue = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        // 累加
        for (IntWritable value : values) {
            sum += value.get();
        }
        outValue.set(sum);
        // 写出
        context.write(key, outValue);
    }
}

2.3 Driver

WordCountDriver.java

package com.uni.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WordCountDriver{
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 1. 获取 job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2. 设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        // 3. 关联 mapper 和 reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4. 设置 map 输出的 k v 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5. 设置最终输出的k v类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6. 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("testInput"));
        FileOutputFormat.setOutputPath(job, new Path("testOutput"));
        // 7. 提交 job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

运行结果:

2.4 DeBug调试

先把调试的断点打好
Map阶段:


Reduce阶段


IDEA 调试的按钮说明:

调试进入Map阶段:

在Mapper阶段执行run方法时通过while循环判断context.nextKeyValue(),最后释放资源cleanup(context)由MapTask类的方法关闭资源,为方便观察Reduce阶段,直接点击IDEA左侧的恢复程序按钮,这样就能直接跳到下一个Reducer类里打的断点。


进入Reduce阶段:

和Map阶段一样,Reducer也是通过run方法来执行任务,但是它的while循环判断是通过context.nextKey(),Reducer关键源码如下:

while (context.nextKey()) {
  reduce(context.getCurrentKey(), context.getValues(), context);
  // If a back up store is used, reset it
  Iterator iter = context.getValues().iterator();
  if(iter instanceof ReduceContext.ValueIterator) {
    ((ReduceContext.ValueIterator)iter).resetBackupStore();        
  }
}

而 Mapper类的部分是这样的:

while (context.nextKeyValue()) {
   map(context.getCurrentKey(), context.getCurrentValue(), context);
}

通过这个可以观察到,Reduce阶段多了一个步骤,这个步骤将会在之后的总结里记录。

三、MR打包提交到Hadoop集群

打包前,先将WordCountDriver类进行修改,实现根据命令输入的参数来进行指定路径的词频统计,就跟之前运行官方示例的效果一样,修改后的内容:

WordCountDriver.java

package com.uni.mr.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WordCountDriver{
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 判断输入的参数是否符合要求
        if(args.length != 2) {
            System.out.println("请输入两个参数: [词频统计的输入文件目录] [输出的文件目录]");
            System.exit(1);
        }
        // 1. 获取 job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2. 设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        // 3. 关联 mapper 和 reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 4. 设置 map 输出的 k v 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5. 设置最终输出的k v类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 6. 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 7. 提交 job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

在使用Maven打包Jar程序时,若想要将项目的依赖一起打包在jar包中,就需先在pom.xml里导入相关的打包插件:(注:build标签必须放在dependency标签的后面)笔者在插件中制定了JDK版本,这里笔者用的是JDK8,若不相同记得修改。


     
         
             maven-compiler-plugin
             3.1
             
                 1.8
                 1.8
             
         
         
             maven-assembly-plugin
             
                 
                     jar-with-dependencies
                 
             
             
                 
                     make-assembly
                     package
                     
                         single
                     
                 
             
         
     
 

Maven 打包


由于是上传到集群,本身具有MR运行的jar包,所以就将不带依赖的jar包上传到集群的节点中。

注: 上传到集群后,Hadoop是采用集群的配置,路径地址就变成了HDFS的地址,而不是之前在windows的IDEA里调试的本地地址,所以需要在集群节点里创建测试的文本,上传到HDFS(步骤略)。

上传后好文件后执行jar包,执行前记得先开启hdfs和yarn

hadoop jar HDFSLearn-1.0-SNAPSHOT.jar com.uni.mr.wordcount.WordCountDriver /input /output

查看运行结果:

hadoop fs -cat /output/*


可以看到和IDEA里运行的结果一样 ,但是否确认就是刚才的jar包起的作用呢?现故意少输入一个参数,看看有没有提示:

hadoop jar HDFSLearn-1.0-SNAPSHOT.jar com.uni.mr.wordcount.WordCountDriver /input


可以看到,结果和程序里写的提示信息是一样的。

至此,MapReduce本地编程,提交到集群的运行测试已完毕。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/712144.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号