MapReduce中主要有三大类Mapper,Reduce,Drive,它们类似八股文一样有自己的格式
pom.xml:
junit junitRELEASE org.apache.logging.log4j log4j-core2.8.2 org.apache.hadoop hadoop-common2.7.2 org.apache.hadoop hadoop-client2.7.2 org.apache.hadoop hadoop-hdfs2.7.2 maven-compiler-plugin 2.3.2 1.8 1.8 maven-assembly-plugin jar-with-dependencies com.hadoop.wordcount.WordCountDriver make-assembly package single
Mapper:
package com.hadoop.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //map阶段 //Mapper<>定义输入和输出 public class WordCountMapper extends Mapper { Text k = new Text(); IntWritable v = new IntWritable(1); // 自定义map LongWritable key Text value是map输入的数据 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取数据进行转换类型 String line = value.toString(); // 切分单词 String[] words = line.split(" "); // 循环写出,遍历数组 for (String word : words) { k.set(word); context.write(k,v); } } }
Reduce:
package com.hadoop.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer{ IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; //累加求和 for (IntWritable value : values) { sum += value.get(); } v.set(sum); context.write(key,v); } }
Drive:
package com.hadoop.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.CombineSequenceFileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.获取job对象
Configuration conf = new Configuration();//job对象的信息
Job job = Job.getInstance(conf);
//2.获取jar存储位置
job.setJarByClass(WordCountDriver.class);
//3.关联map和reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
//4.指定mapper输出数据kv值类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.指定最终输出数据的kv值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.指定job的输入原始文件所在的目录
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7.指定job的输出结果所在的目录
//job.submit();
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
打包成jar在集群上运行,先在pom.xml中导入导包工具,在进行以下操作:



