参考资料运行环境一、Java MapReduce 编程规范
1.1 Mapper阶段1.2 Reduce阶段1.3 Driver阶段 二、WordCount 案例
2.1 Mapper2.2 Reducer2.3 Driver2.4 DeBug调试 三、MR打包提交到Hadoop集群
参考资料视频链接
运行环境windows10CentOS7虚拟机JDK8Hadoop3.1.3 一、Java MapReduce 编程规范
用于编写的程序分为三个部分:Mapper、Reducer和Driver
1.1 Mapper阶段- 用户自定义的Mapper需继承自己的父类Mapper的输入数据是KV对的形式(KV类型可自定义)Mapper中的业务逻辑卸载map() 方法中Maper的输出数据是KV对的形式(KV类型可自定义)map()方法(MapTask进程)对每一个
- 用户自定义的Reducer要继承自己的父类Reducer的输入数据类型赌赢Mapper的输出数据类型,也是KV、Reducer的业务逻辑写在reduce()方法中ReduceTask进程对每一组相同k的
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的Job对象。
二、WordCount 案例需求:在给定的文本文件中统计输出每一个单词出现的总次数
测试数据:
hello wolrd hello spark my name is uni hadoop is nice hadoop
预期结果:
hello 2 hadoop 2 is 2 my 1 name 1 nice 1 spark 1 uni 1 world 12.1 Mapper
- 将MapTask传给文本内容,先转换成String根据空格将这一行切分成单词将单词转化为<单词, 1> 的形式
WordCountMapper.java
package com.uni.mr.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper2.2 Reducer{ // 放在上面声明防止在循环里多次创建对象,浪费空间 private Text outKey = new Text(); private IntWritable outValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1. 获取一行 String line = value.toString(); // 2. 切割 String[] words = line.split(" "); // 3. 循环写出 for (String word : words) { // 封装 outKey outKey.set(word); context.write(outKey, outValue); } } }
- 汇总各个key的个数输出该key的总次数
WordCountReducer.java
package com.uni.mr.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer2.3 Driver{ private IntWritable outValue = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; // 累加 for (IntWritable value : values) { sum += value.get(); } outValue.set(sum); // 写出 context.write(key, outValue); } }
WordCountDriver.java
package com.uni.mr.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class WordCountDriver{
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1. 获取 job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3. 关联 mapper 和 reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4. 设置 map 输出的 k v 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5. 设置最终输出的k v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6. 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("testInput"));
FileOutputFormat.setOutputPath(job, new Path("testOutput"));
// 7. 提交 job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
运行结果:
2.4 DeBug调试先把调试的断点打好
Map阶段:
Reduce阶段
IDEA 调试的按钮说明:
调试进入Map阶段:
在Mapper阶段执行run方法时通过while循环判断context.nextKeyValue(),最后释放资源cleanup(context)由MapTask类的方法关闭资源,为方便观察Reduce阶段,直接点击IDEA左侧的恢复程序按钮,这样就能直接跳到下一个Reducer类里打的断点。
进入Reduce阶段:
和Map阶段一样,Reducer也是通过run方法来执行任务,但是它的while循环判断是通过context.nextKey(),Reducer关键源码如下:
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator)iter).resetBackupStore();
}
}
而 Mapper类的部分是这样的:
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
通过这个可以观察到,Reduce阶段多了一个步骤,这个步骤将会在之后的总结里记录。
三、MR打包提交到Hadoop集群打包前,先将WordCountDriver类进行修改,实现根据命令输入的参数来进行指定路径的词频统计,就跟之前运行官方示例的效果一样,修改后的内容:
WordCountDriver.java
package com.uni.mr.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class WordCountDriver{
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 判断输入的参数是否符合要求
if(args.length != 2) {
System.out.println("请输入两个参数: [词频统计的输入文件目录] [输出的文件目录]");
System.exit(1);
}
// 1. 获取 job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2. 设置jar包路径
job.setJarByClass(WordCountDriver.class);
// 3. 关联 mapper 和 reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4. 设置 map 输出的 k v 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5. 设置最终输出的k v类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6. 设置输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7. 提交 job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
在使用Maven打包Jar程序时,若想要将项目的依赖一起打包在jar包中,就需先在pom.xml里导入相关的打包插件:(注:build标签必须放在dependency标签的后面)笔者在插件中制定了JDK版本,这里笔者用的是JDK8,若不相同记得修改。
maven-compiler-plugin 3.1 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single
Maven 打包
由于是上传到集群,本身具有MR运行的jar包,所以就将不带依赖的jar包上传到集群的节点中。
注: 上传到集群后,Hadoop是采用集群的配置,路径地址就变成了HDFS的地址,而不是之前在windows的IDEA里调试的本地地址,所以需要在集群节点里创建测试的文本,上传到HDFS(步骤略)。
上传后好文件后执行jar包,执行前记得先开启hdfs和yarn
hadoop jar HDFSLearn-1.0-SNAPSHOT.jar com.uni.mr.wordcount.WordCountDriver /input /output
查看运行结果:
hadoop fs -cat /output/*
可以看到和IDEA里运行的结果一样 ,但是否确认就是刚才的jar包起的作用呢?现故意少输入一个参数,看看有没有提示:
hadoop jar HDFSLearn-1.0-SNAPSHOT.jar com.uni.mr.wordcount.WordCountDriver /input
可以看到,结果和程序里写的提示信息是一样的。
至此,MapReduce本地编程,提交到集群的运行测试已完毕。



