修改pom.xml文件
本地编写java代码4.0.0 org.example hdfs_upload 1.0-SNAPSHOT hdfs_upload http:///maven.apache.org UTF-8 1.7 1.7 org.apache.hadoop hadoop-common 3.1.3 org.apache.hadoop hadoop-client 3.1.3 org.apache.hadoop hadoop-hdfs 3.1.3 test org.apache.hadoop hadoop-mapreduce-client-core 3.1.3 junit junit 4.11 org.slf4j slf4j-log4j12 1.7.25 org.apache.maven.plugins maven-compiler-plugin 3.1 1.8 1.8 UTF-8 org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade
对word.txt文件进行每个单词出现次数的统计
word.txt文件内容:
zqk zqk mfh mfh zhangqiongke zhangqiongke
在项目目录下src/main/java下创建com.example文件夹
在项目目录下src/main/java/com.example下创建
- WordCountMain
- WordCountMapper
- WordCountReducer
package com.example; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends MapperWordCountReducer{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { Text k = new Text(); IntWritable v = new IntWritable(1); // 将Text类型⽂本转换为String类型 String text = value.toString(); // 分词,存储到String数组 String[] words = text.split(" "); // 输出 for (String word : words) { // k.set(word):将word装载到k中 k.set(word); // 将map()函数输出的键值对写⼊到MapReduce上下⽂环境 context.write(k, v); } } }
package com.example; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends ReducerWordCountMain{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int sum=0; IntWritable v = new IntWritable(); for (IntWritable count : values) { // count.get():获取 count 值 sum += count.get(); } // 将sum值装载到v中 v.set(sum); // 将reduce()函数输出的键值对写⼊到MapReduce上下⽂环境 context.write(key,v); } }
package com.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCountMain {
public static void main(String[] args) throws Exception{
// 创建hadoop集群配置对象
Configuration conf = new Configuration();
// 创建⼀个 job 实例
Job job = Job.getInstance(conf,"word count");
// 设置主类
job.setJarByClass(WordCountMain.class);
// 设置 job 的 mapper 类
job.setMapperClass(WordCountMapper.class);
// 设置 Mapper 的输出键类型
job.setMapOutputKeyClass(Text.class);
// 设置 Mapper 的输出值类型
job.setMapOutputValueClass(IntWritable.class);
// 设置 job 的 reducer 类
job.setReducerClass(WordCountReducer.class);
// 设置 Reducer 的输出键类型
job.setOutputKeyClass(Text.class);
// 设置 Reducer 的输出值类型
job.setOutputValueClass(IntWritable.class);
// 指定 job 的输⼊⽂件路径
FileInputFormat.setInputPaths(job, new Path("C:\Users\11150\Desktop\word.txt"));
// 指定 job 的输出⽂件路径
FileOutputFormat.setOutputPath(job, new Path("C:\Users\11150\Desktop\newword.txt"));
Path path = new Path("C:\Users\11150\Desktop\newword.txt");
FileSystem fs = FileSystem.get(conf);
// 判断⽬录是否存在,如果存在,删除该⽬录
if (fs.exists(path)) {
fs.delete(path,true);
}
// 等待任务结束
System.exit(job.waitForCompletion(true)?0:1);
}
}
运行结果



