WordCountMapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper {
Text txt =new Text();
IntWritable intWritable=new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("WordCountMapper key:"+key+" value:"+value);
String[] words=value.toString().split(" ");
for(String word :
words){
txt.set(word);
intWritable.set(1);
context.write(txt,intWritable);
}
}
}
WordCountReduce
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReduce extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int count=0;
for (IntWritable intWritable:
values) {
count+=intWritable.get();
}
LongWritable longWritable=new LongWritable(count);
System.out.println("WordCountReduce key:"+key+ "value:"+longWritable.get());
context.write(key,longWritable);
}
}
WordCountDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
//import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf =new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(WordCountDriver.class);
//配置当前job 指定mapper类
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//配置当前job 执行的reduce类
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//指定读取文件路径
// Path path =new Path("E:\hadoopstu\in\demo1");
// Path path =new Path("hdfs://linux01:9000/input");
Path path=new Path(args[0]);
FileInputFormat.setInputPaths(job,path);
//指定执行任务完成后的路径
// Path pathout =new Path("E:\hadoopstu\in\out1");
// Path pathout =new Path("hdfs://linux01:9000/output1");
Path pathout =new Path(args[1]);
FileSystem fileSystem=FileSystem.get(pathout.toUri(),conf);
if(fileSystem.exists(pathout)){
fileSystem.delete(pathout,true);
}
FileOutputFormat.setOutputPath(job,pathout);
job.waitForCompletion(true);
System.out.println(args[0]+" "+args[1]);
}
}