Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。
Map类
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMap extends Mapper{ @Override protected void map(LongWritable key,Text value,Context context) throws IOException, InternalError, InterruptedException { //读取每行文本 String line = value.toString(); //splite拆分每行 String[] words = line.split(" "); //取出每个单词 for(String word :words){ //将单词转为Text类型 Text wordText = new Text(word); //将1转变为IntWritable IntWritable outValues = new IntWritable(1); //写出单词,跟对应1 context.write(wordText,outValues); } } }
Reduce类
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable number:values){ sum += number.get(); } context.write(key,new IntWritable(sum)); } }
连接实践类
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.examples.SecondarySort;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
// 定义配置对象
Configuration conf = new Configuration();
// 定义一个工作任务对象
Job job = Job.getInstance(conf);
// 获取map阶段的一个对象
job.setMapperClass(WordCountMap.class);
// 指定map阶段输出的key类型
job.setOutputKeyClass(Text.class);
// 指定map阶段输出的values类型
job.setOutputValueClass(IntWritable.class);
// map阶段的输出文件
FileInputFormat.setInputPaths(job,new Path("C:/Users/夕七/Desktop/123.txt"));
// 指定Reduce的类
job.setReducerClass(SecondarySort.Reduce.class);
// 指定reduce阶段输出的key类型
job.setOutputKeyClass(Text.class);
// 指定reduce阶段输出的values类型
job.setOutputValueClass(IntWritable.class);
// 指定Reduce阶段输出文件
FileOutputFormat.setOutputPath(job,new Path("C:/Users/夕七/Desktop/234.txt"));
job.waitForCompletion(true);
}
}



