在IDEA编辑工具创建一个Maven项目,并在pom.xml文件导入如下依赖:
pom.xml(注意hadoop版本需要与windows上安装的hadoop版本一致,否则在读取文件路径时会出错)
org.apache.hadoop hadoop-client2.7.7 junit junit4.12 org.slf4j slf4j-log4j121.7.30 maven-compiler-plugin 3.6.1 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single
注意!!!(所有,导入的包不要导错了)
注意!!!(所有,导入的包不要导错了)
注意!!!(所有,导入的包不要导错了)
重要的事情说三遍,导包特别容易出错!!!
编写Map阶段代码,创建WordCountMapper类,添加如下代码:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ private Text OutKey = new Text(); private IntWritable OutValue = new IntWritable(1); //map方法,每行数据调用一次 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1、获取一行数据 //value是Text类型,先转换为String //数据为 hgl hgl String line = value.toString(); //数据转换为 //hgl //hgl String[] words = line.split(" "); //循环写出words中的word for (String word : words) { //封装OutKey OutKey.set(word); //写出 //context.write()方法将Map阶段的输出数据传入到Reduce阶段 //数据转变为(hgl,1)(hgl,1) context.write(OutKey,OutValue); } } }
编写Reduce阶段代码,创建WordCountReducer类,添加如下代码:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ private IntWritable OutValue = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //从Map阶段进来的数据(hgl,1)(hgl,1)==>(hgl,(1,1)) int sum = 0; for (IntWritable value : values) { sum += value.get(); } //将value传入OutValue中 OutValue.set(sum); //输出 //(hgl,2) context.write(key,OutValue); } }
编写Driver类代码,创建WordCountDriver类,添加如下代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1 获取Job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 设置jars包路径
job.setJarByClass(WordCountDriver.class);
//3 关联map和reduce
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4 设置map输出的k,v数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5 设置最终输出的k,v数据类型(因为有些没有reduae阶段,所以不是设置reduce的k,v类型)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6 设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path("F:\inputword\hello.txt"));
//注意输出路径不能提前存在
FileOutputFormat.setOutputPath(job,new Path("F:\outputword"));
//7 提交Job
boolean result = job.waitForCompletion(true);//传入true,打印更多job信息
//三元运算符 成功返回0,失败返回1
System.exit(result ? 0 : 1);
}
}
数据源:
输出结果:



