目录
一. 【实验准备】
1.工作目录
2.打开eclipse并配置工作空间
二、准备工作
1. 新建项目
2. 准备测试数据
3. 添加 MapReduce 编程框架
三、Map过程
四、Reduce过程
五、执行MapReduce任务
六、实验结果
七、准备工作
1. 新建项目
2. 准备relation.dat
3. 添加 MapReduce 编程框架
(1).MyMapper
(2).MyReducer
(3).MyRunner
八、Map过程
九、Reduce过程
十、执行MapReduce任务
十一、实验结果
十二、准备工作
1.新建项目
2.准备 input.dat
十三、Map过程
十四、Combine过程
十五、Reduce过程
十六、执行MapReduce过程
十七、实验结果
一. 【实验准备】
参考《Hadoop安装部署》实验,安装部署配置了三个数据节点的Hadoop集群
1.工作目录
本实验的工作目录为~/course/hadoop/mr_pro,使用以下命令创建和初始化工作目录:
$ mkdir -p ~/course/hadoop/mr_pro $ cd ~/course/hadoop/mr_pro
2.打开eclipse并配置工作空间
在桌面右键打开终端输入如下命令打开eclipse:
eclipse &
打开eclipse后选择/headless/course/hadoop/mr_pro做为工作空间
二、准备工作
1. 新建项目
1.在eclipse中依次点击:File->New->Project->Map/Reduce Project->Next。
2.在项目名称(Project Name)处填入WordCount,将工程位置设置为文件夹/headless/course/hadoop/mr_pro/WordCount,点击Finish。
2. 准备测试数据
新建终端,使用如下命令新建一个文本文件:
# cd ~/course/hadoop/mr_pro/WordCount/ # mkdir target # mkdir data # cd data # echo "Hello World" >> file1.txt # echo "Hello MapReduce" >> file2.txt
使用如下命令进入master节点:
# docker exec -it --privileged master /bin/bash
主机的~/course目录挂载到了master节点的/course目录。
在master节点中使用如下命令新建目录,并将文本文件上传到目录:
# hadoop fs -mkdir -p mapreduce/WordCount/input # cd /course/hadoop/mr_pro/WordCount/data # hadoop fs -put file1.txt file2.txt mapreduce/WordCount/input # hadoop fs -ls mapreduce/WordCount/input Found 2 items -rw-r--r-- 3 bd1_cg bd1 12 2018-12-20 17:59 mapreduce/WordCount/input/file1.txt -rw-r--r-- 3 bd1_cg bd1 16 2018-12-20 17:59 mapreduce/WordCount/input/file2.txt
3. 添加 MapReduce 编程框架
MapReduce 编程框架分为三个部分,请在 Eclipse 中的 WordCount 下分别创建如下三个类,内容分别如下:
(1).WcMapper
public class WcMap extends Mapper{ //重写map这个方法 //mapreduce框架每读一行数据就调用一次该方法 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value //key是这一行数据的起始偏移量,value是这一行的文本内容 } }
(2).WcReducer
public class WcReduce extends Reducer{ //继承Reducer之后重写reduce方法 //第一个参数是key,第二个参数是集合。 //框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组 ,调用一次reduce方法 protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { } }
(3).WcRunner
public class WcRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf);
//设置整个job所用的那些类在哪个jar包
job.setJarByClass(WcRunner.class);
//本job使用的mapper和reducer的类
job.setMapperClass(WcMap.class);
job.setReducerClass(WcReduce.class);
//指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定要处理的输入数据存放路径
FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.51.149:9000/user/cg/input"));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.51.149:9000/user/cg/output"));
//将job提交给集群运行
job.waitForCompletion(true);
}
}
三、Map过程
Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,并重写其map方法。通过在map方法中添加两句把key值和value值输出到控制台的代码,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。然后StringTokenizer类将每一行拆分成为一个个的单词,并将
完整代码与解析如下:
import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WcMap extends Mapper{ //重写map这个方法 //mapreduce框架每读一行数据就调用一次该方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value //key是这一行数据的起始偏移量,value是这一行的文本内容 //1: String str = value.toString(); //2:切分单词,空格隔开,返回切分开的单词 String[] words = StringUtils.split(str," "); //3:遍历这个单词数组,输出为key-value的格式,将单词发送给reduce for(String word : words){ //输出的key是Text类型的,value是LongWritable类型的 context.write(new Text(word), new LongWritable(1)); } } }
四、Reduce过程
Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。Map过程输出
完整代码与解析如下:
import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WcReduce extends Reducer{ //继承Reducer之后重写reduce方法 //第一个参数是key,第二个参数是集合。 //框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组 ,调用一次reduce方法 // @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { //将values进行累加操作,进行计数 long count = 0; //遍历value的list,进行累加求和 for(LongWritable value : values){ count += value.get(); } //输出这一个单词的统计结果 //输出放到hdfs的某一个目录上面,输入也是在hdfs的某一个目录 context.write(key, new LongWritable(count)); } }
五、执行MapReduce任务
在MapReduce中,由Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。此处设置了使用TokenizerMapper完成Map过程中的处理和使用IntSumReducer完成Combine和Reduce过程中的处理。还设置了Map过程和Reduce过程的输出类型:key的类型为Text,value的类型为IntWritable。任务的输出和输入路径则由命令行参数指定,并由FileInputFormat和FileOutputFormat分别设定。完成相应任务的参数设定后,即可调用job.waitForCompletion()方法执行任务。
完整代码及解析如下:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WcRunner{
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf);
//设置整个job所用的那些类在哪个jar包
job.setJarByClass(WcRunner.class);
//本job使用的mapper和reducer的类
job.setMapperClass(WcMap.class);
job.setReducerClass(WcReduce.class);
//指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定要处理的输入数据存放路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job提交给集群运行
job.waitForCompletion(true);
}
}
六、实验结果
将项目导出为WordCount.jar 到本实验工作目录的WordCount/target子目录下,进入master节点,切换到 jar 包所在目录/course/hadoop/mr_pro/WordCount/target,输入以下命令提交作业到集群运行:
hadoop jar WordCount.jar mapreduce/WordCount/input mapreduce/WordCount/output #后面两个参数为HDFS中输入输出路径
运行结果如下:
查看 mapreduce/WordCount/output/part-r-00000
结果无误。
七、准备工作 1. 新建项目
在本实验工作目录下创建文件夹Relation。
在eclipse中依次点击:File->New->Project->Map/Reduce Project->Next。
在项目名称(Project Name)处填入“Relation”,将工程位置设置为上述的文件夹,点击“Finish”。
点击右侧链接下载log4j配置文件:log4j.properties.txt
然后将该文件上传至实验环境内的“Relation”项目的src目录下,并将文件重命名为log4j.properties。
2. 准备relation.dat
在eclipse的Relation目录上,单击右键->New->Folder,填入data,创建数据目录,如下所示。
在data目录上,单击右键->New->File,填入relation.dat,创建数据文件,数据文件内容如下:
Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip Alma Mark Terry Mark Alma
创建后的数据文件效果图如下所示:
接着,在HDFS创建目录,将数据上传到该目录,
在master节点上执行的命令和效果如下:
# cd /course/hadoop/mr_pro/Relation/data #进入数据目录 # hadoop fs -mkdir -p mapreduce/relation/input #在HDFS上创建目录 # hadoop fs -put relation.dat mapreduce/relation/input #上传数据到HDFS # hadoop fs -ls mapreduce/relation/input #查看数据 Found 1 items -rw-r--r-- 2 root supergroup 148 2020-02-07 11:49 mapreduce/relation/input/relation.dat3. 添加 MapReduce 编程框架
MapReduce 编程框架分为三个部分,请在 Eclipse 中的 WordCount 下分别创建如下三个类,内容分别如下:
(1).MyMapper
public class MyMap extends Mapper{
//重写map这个方法
//mapreduce框架每读一行数据就调用一次该方法
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value
//key是这一行数据的起始偏移量,value是这一行的文本内容
}
}
(2).MyReducer
public class MyReduce extends Reducer{
//继承Reducer之后重写reduce方法
//第一个参数是key,第二个参数是集合。
//框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组,调用一次reduce方法
protected void reduce(Text key, Iterable values,Context context)
throws IOException, InterruptedException {
}
}
(3).MyRunner
public class MyRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf);
//设置整个job所用的那些类在哪个jar包
job.setJarByClass(MyRunner.class);
//本job使用的mapper和reducer的类
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
//指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定要处理的输入数据存放路径
FileInputFormat.setInputPaths(job, new Path("hdfs://node1.cg:8020/user/cg/input"));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://node1.cg:8020/user/cg/output"));
//将job提交给集群运行
job.waitForCompletion(true);
}
}
public class MyReduce extends Reducer{ //继承Reducer之后重写reduce方法 //第一个参数是key,第二个参数是集合。 //框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组 ,调用一次reduce方法 protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { } }
(3).MyRunner
public class MyRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf);
//设置整个job所用的那些类在哪个jar包
job.setJarByClass(MyRunner.class);
//本job使用的mapper和reducer的类
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
//指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//指定要处理的输入数据存放路径
FileInputFormat.setInputPaths(job, new Path("hdfs://node1.cg:8020/user/cg/input"));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path("hdfs://node1.cg:8020/user/cg/output"));
//将job提交给集群运行
job.waitForCompletion(true);
}
}
八、Map过程
在src上,单击右键->New->Class->Name处填MyMapper->Finish,MyMapper的完整代码与解析如下:
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper{ //重写map这个方法 //mapreduce框架每读一行数据就调用一次该方法 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //具体业务逻辑就写在这个方法体中,而且我们业务要处理的数据已经被框架传递进来,在方法的参数中key-value //key是这一行数据的起始偏移量,value是这一行的文本内容 //1:切分名字,用空格隔开,前面的是孩子,后面的是父母 String child = value.toString().split(" ")[0]; String parent = value.toString().split(" ")[1]; //2:产生正序与逆序的key-value同时压入context context.write(new Text(child), new Text("-" + parent)); context.write(new Text(parent), new Text("+" + child)); } }
九、Reduce过程
在src上,单击右键->New->Class->Name处填MyMapper->Finish,MyReducer的完整代码与解析如下:
import java.io.IOException; import java.util.ArrayList; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer{ //继承Reducer之后重写reduce方法 //第一个参数是key,第二个参数是集合。 //框架在map处理完成之后,将所有key-value对缓存起来,进行分组,然后传递一个组 ,调用一次reduce方法 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //1:创建两个List保存祖父母和孩子 ArrayList grandparent = new ArrayList (); ArrayList grandchild = new ArrayList (); //2:对各个values中的值进行处理,key的父母保存到祖父母列表中,key的子女保存在孩子孩子列表中 for (Text t : values) { String s = t.toString(); if (s.startsWith("-")) { grandparent.add(new Text(s.substring(1))); } else { grandchild.add(new Text(s.substring(1))); } } //3:再将grandparent与grandchild中的东西,一一对应输出。 for (int i = 0; i < grandchild.size(); i++) { for (int j = 0; j < grandparent.size(); j++) { context.write(new Text(grandchild.get(i) + " "), grandparent.get(j)); } } } }
十、执行MapReduce任务
在src上,单击右键->New->Class->Name处填MyRunner->Finish,MyRunner的完整代码与解析如下:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf);
//设置整个job所用的那些类在哪个jar包
job.setJarByClass(MyRunner.class);
//本job使用的mapper和reducer的类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定要处理的输入数据存放路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job提交给集群运行
job.waitForCompletion(true);
}
}
十一、实验结果
在Relation目录下新建target目录,导出Relation项目的jar包,jar包命名为Relation.jar.
在master节点输入hadoop jar命令提交作业到hadoop集群运行:
cd /course/hadoop/mr_pro/Relation/target hadoop jar Relation.jar mapreduce/relation/input mapreduce/relation/output #后面两个参数为HDFS中输入输出路径
运行结果如下:
使用如下命令查看计算结果:
# hadoop fs -ls mapreduce/relation/output # hadoop fs -cat mapreduce/relation/output/part-r-00000
运行结果如下:
可见结果符合预期。
十二、准备工作
1.新建项目
在本实验工作目录下创建文件夹Inverted。
在eclipse中依次点击:File->New->Project->Map/Reduce Project->Next。
在项目名称(Project Name)处填入Inverted,将工程位置设置为上述的文件夹,点击Finish。
2.准备 input.dat
在Inverted项目下分别创建data目录和target目录,从终端进入data目录,使用如下命令生成3个文本文件:1.txt,2.txt,3.txt
#mkdir ~/course/hadoop/mr_pro/Inverted/data/ #mkdir ~/course/hadoop/mr_pro/Inverted/target/ # cd ~/course/hadoop/mr_pro/Inverted/data/ # echo "MapReduce is simple" > 1.txt # echo "MapReduce is powerful and simple" > 2.txt # echo "Hello MapReduce bye MapReduce" > 3.txt # ls 1.txt 2.txt 3.txt
在eclipse的Inverted项目目录上,右键->Refresh效果图如下:
在master节点上执行如下命令,在HDFS上创建目录,并将3个文件上传至HDFS上:
#cd /course/hadoop/mr_pro/Inverted/data/ # hadoop fs -mkdir -p mapreduce/inverted/input # hadoop fs -put *.txt mapreduce/inverted/input # hadoop fs -ls mapreduce/inverted/input Found 3 items -rw-r--r-- 2 root supergroup 20 2020-02-07 12:09 mapreduce/inverted/input/1.txt -rw-r--r-- 2 root supergroup 33 2020-02-07 12:09 mapreduce/inverted/input/2.txt -rw-r--r-- 2 root supergroup 30 2020-02-07 12:09 mapreduce/inverted/input/3.txt
十三、Map过程
首先使用默认的 TextInputFormat 类对输入文本进行处理,得到文本中每行的偏移量及其内容。随后,Map过程分析输入的
存在两个问题,第一:
完整的Map过程代码如下:
import java.io.*; import java.util.StringTokenizer; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class MyMapper extends Mapper
在项目的src下面创建MyMapper类,并填入以上代码。
十四、Combine过程
将key值相同的value值累加,得到一个单词在文档中的词频,如图:
Combine过程的完整代码如下所示:
import java.io.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Reducer; public class MyCombiner extends Reducer{ //1:声明输出的value变量 private Text info = new Text(); public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{ //2:声明计数变量sum并初始化为0,统计一个文件中每一个单词的出现次数 int sum = 0; for(Text value:values){ sum += Integer.parseInt(value.toString()); } //3:将从Map函数传过来的key值分割成单词和文件名保存在str中 String record = key.toString(); String[] str = record.split(" "); //4:新的key值设为单词,value设为“文件名+词频” key.set(str[0]); info.set(str[1]+":"+sum); context.write(key,info); } }
在src下创建MyCombiner类,并填入以上代码。
十五、Reduce过程
经过上述两个过程后,Reduce过程只需将相同key值的value值组合成倒排索引文件所需的格式即可,剩下的事就可以直接交给MapReduce框架进行处理了。
Reduce 过程的完整代码如下:
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer{ private Text result = new Text(); public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException{ String value =new String(); for(Text value1:values){ value += value1.toString()+" ; "; } result.set(value); context.write(key,result); } }
在src下创建MyReducer类,并填入以上代码。
十六、执行MapReduce过程
完整代码与解析如下:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//创建配置文件
Configuration conf = new Configuration();
//获取一个作业
Job job = Job.getInstance(conf);
//设置整个job所用的那些类在哪个jar包
job.setJarByClass(MyRunner.class);
//本job使用的mapper、combiner和reducer的类
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setCombinerClass(MyCombiner.class);
//指定reduce的输出数据key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//指定mapper的输出数据key-value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//指定要处理的输入数据存放路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定处理结果的输出数据存放路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//将job提交给集群运行
job.waitForCompletion(true);
}
}
在src下创建MyRunner类,并填入以上代码。
十七、实验结果
将项目导出为 Inverted.jar ,导出路径为当前项目的target目录下,
在master节点上切换到 jar 包所在目录,输入以下命令提交作业到集群运行:
# cd /course/hadoop/mr_pro/Inverted/target # hadoop jar Inverted.jar mapreduce/inverted/input mapreduce/inverted/output #后面两个参数为HDFS中输入输出路径
运行结果如下:
使用以下命令查看计算结果:
# hadoop fs -ls mapreduce/inverted/output # hadoop fs -cat mapreduce/inverted/output/part-r-00000
可见,结果无误。



