需求
在给定的文本文件中统计输出每一个单词出现的总次数
(1)文本数据:hello.txt
ss ss cls cls jiao banzhang xue hadoop
(2)期望输出数据
banzhang 1 cls 2 hadoop 1 jiao 1 ss 2 xue 11、 先创建Maven工程并添加所需依赖:
2、 在Resource 目录下创建 log4j2.xml文件并填入junit junit 4.12 org.apache.logging.log4j log4j-slf4j-impl 2.12.0 org.apache.hadoop hadoop-client 3.1.3
3、 按照 MapReduce 编程规范,分别编写 Mapper,Reducer,Driver。 3.1 WCMapper
WCMapper 负责整理数据,每次读入一行数据,输出每个单词。形如
aaa 1
public class WCMapper extends Mapper3.2 WCReducer{ private Text outKey=new Text(); private LongWritable outValue =new LongWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.将Text转成String(可以使用StringAPI) String line = value.toString(); //将数据按“ ”进行分割并存入数据。 String[] words = line.split(" "); //写出数组中的每个词 for (String word : words) { //设置key,value值 outKey.set(word); outValue.set(1); //将key,value写出去 context.write(outKey,outValue); } } }
WCReducer 每次读入Map传入的一组数据,例如 key 为"aaa"的数据有两条,WCReducer就会一次读入所有 key 为"aaa"的数据。
aaa 1
aaa 1
WCReducer 的目标就是统计 key 为"aaa"的数据条数。
输出:
aaa 2
public class WCReduce extends Reducer3.3 WCDriver 3.3.1 生成jar包并放到服务器上运行的写法(WCDriver){ private LongWritable outValue = new LongWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { sum += value.get(); } outValue.set(sum); context.write(key,outValue); } }
public class WCDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1 获取配置信息以及封装任务
Configuration conf =new Configuration();
Job job =Job.getInstance(conf);
//2 设置Jar加载路径--将main方法所在的类传过去(如果是本地运行可以不写)
job.setJarByClass(WCDriver.class);
//3 设置map和reduce类
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReduce.class);
//4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//6 设置数据输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7 提交
boolean b =job.waitForCompletion(true);
System.exit(b?0:1);
}
}
后接第4节。
,生成jar包并将jar包放到服务器上运行
3.3.1 在Windows上向集群提交任务(WCDriver2)相比上个 WCDriver 中的代码,WCDriver2 多了对 conf 变量添加的一些配置。
public class WCDriver2 {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1 获取配置信息以及封装任务
Configuration conf =new Configuration();
//设置在集群运行的相关参数-设置HDFS,NAMENODE的地址
conf.set("fs.defaultFS", "hdfs://hadoop102:8020");
//指定MR运行在Yarn上
conf.set("mapreduce.framework.name","yarn");
//指定MR可以在远程集群运行
conf.set("mapreduce.app-submission.cross-platform","true");
//指定yarn resourcemanager的位置
conf.set("yarn.resourcemanager.hostname", "hadoop103");
Job job =Job.getInstance(conf);
//2 设置Jar加载路径--将main方法所在的类传过去(如果是本地运行可以不写)
// job.setJarByClass(WCDriver2.class);
job.setJar("D:\Study_Code\MRDemo\target\MRDemo-1.0-SNAPSHOT.jar");
//3 设置map和reduce类
job.setMapperClass(WCMapper.class);
job.setReducerClass(WCReduce.class);
//4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//6 设置数据输入输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7 提交
boolean b =job.waitForCompletion(true);
System.exit(b?0:1);
}
}
注意配置的端口要与服务器Hadoop配置文件中的一致。
4、 将 jar 包放至服务器上运行1、生成jar包
2、将jar包丢到服务器上
3、运行 jar 包中的WCDriver类
hadoop jar MRDemo-1.0-SNAPSHOT.jar com.atguigu.wordcount.WCDriver /input/hello.txt /output
4、打开网页端 hadoop102:9870 确认结果。



