一、Mapper源码
package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.task.MapContextImpl; @InterfaceAudience.Public @InterfaceStability.Stable public class Mapper{ public abstract class Context implements MapContext { } protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
1.context抽象方法是连接上下文作用,用于连接Mapper和Reducer,是交互桥梁
2.setup()方法和cleanup()方法都是Called Once,只调用一次
3.run()方法一般都会被重写,在run()方法中,会调用map方法,map()方法会被每一对k,v所调用,意味着map()会被调用很多次。
二、WordCountMapper类
Mapper源码中形参是
例如
lz lz
lili lili
输入的应该是0 lz; 4 lz;5 li
输出的应该是lz,1;lz,1;li,1;li,1
所以
KEYIN:每个单词前面的偏移量 LongWritable
VALUEIN:单词 Text
KEYOUT: 单词 Text
VALUEOUT: 数量 IntWritable
package com.atguigu.mapreduce.wordcount2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取一行 String line = value.toString(); //切割单词 String[] words = line.split(" "); //输出 for(String word:words){ k.set(word); context.write(k,v); } } }
重写map()方法
三、Reducer源码
package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; import java.util.Iterator; @Checkpointable @InterfaceAudience.Public @InterfaceStability.Stable public class Reducer{ public abstract class Context implements ReduceContext { } protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } @SuppressWarnings("unchecked") protected void reduce(KEYIN key, Iterable values, Context context ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value); } } protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator )iter).resetBackupStore(); } } } finally { cleanup(context); } } }
setup()、cleanup()方法类似于Mapper中的方法
reducer()方法传送的形参是
Iterable
四、WordCountReducer
package com.atguigu.mapreduce.wordcount2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { sum = 0; for (IntWritable count : values) { sum += count.get(); } v.set(sum); context.write(key,v); } }
Mapper中的输出既是Reducer中的输入,即为Text,IntWritable
五、Driver驱动类
package com.atguigu.mapreduce.wordcount2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1.获取配置信息及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2.关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);
//3.关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.设置最终输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//7.提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
分为八部:
一、获取配置信息以及job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
二、关联Driver程序的jar
job.setJarByClass(WordCountDriver.class);此方法通过这个类来反射当前这个jar包在什么位置
三、关联Mapper和Reducer程序的jar
job.setMapperClass
job.setReducerClass
四、设置Mapper输出的kv类型
job.setMapOutputKeyClass()
job.setMapOutputValueClass()
五、设置最终输出的kv类型
job.setOutputKeyClass()
job.setOutputValueClass()
六、设置输入和输出路径
FileInputFormat.setInputPaths(job,new Path(args[0]))
FileOutputFormat.setOutputPaths(job,new Path(args[0]))
用args的形式来实现动态输出
输出路径不能存在
七、提交job
boolean result = job.waitForCompletion(true);
System.exit(resule?0:1);



