domain中存储的是继承WritableComparable的数据对象;
sort中实现的是局部排序;
totalsort中实现的是全局排序。
数据内容
sort1.txt(sort处理)
movie1 72 movie2 83 movie3 67 movie4 79 movie5 84 movie6 68 movie7 79 movie8 56 movie9 69 movie10 57 movie11 68
sort2.txt(totalsort处理)
93 239 231 23 22 213 613 232 614 213 3939 232 4546 565 613 231 231 2339 231 1613 5656 657 61313 4324 213 613 2 232 32 393 613 4535 61321 3942 453 6133 392 453 6131 322 452 232 393 455 3613 3939 32421. domain Movie
package hadoop_test.sort_test_08.domain; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; // 继承WritableComparable,可对其进行排序 public class Movie implements WritableComparable{ private String name; private int hot; @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(hot); } @Override public void readFields(DataInput in) throws IOException { this.name=in.readUTF(); this.hot=in.readInt(); } @Override public int compareTo(Movie o) { System.out.println("this.hot: " + this.hot +" "+ "o.hot: " + o.hot); System.out.println(o.hot-this.hot); // return this.hot-o.hot; // 由小到大排列 return o.hot-this.hot; // 由大到小排列 } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getHot() { return hot; } public void setHot(int hot) { this.hot = hot; } @Override public String toString() { return "Movie [name=" + name + ", hot=" + hot + "]"; } }
排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask都会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序的数据均会被排序,而不管逻辑上是否需要。默认是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率到达一定阈值后,再对缓冲区中的数据进行一次快速排序,将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数据超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
拓展资料:
Hadoop之排序
MapReduce怎么优雅地实现全局排序
五、MapReduce练习----学生成绩按照总成绩降序排列,总成绩相同依次按照语文成绩、数学成绩
目标:基于sort1.txt中电影评分,对其进行从大到小排序
SortDriverpackage hadoop_test.sort_test_08.sort;
import hadoop_test.Utils_hadoop;
import hadoop_test.sort_test_08.domain.Movie;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(SortDriver.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(Movie.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("/hadoop_test/sort_test/sort.txt"));
FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/sort_test/result1"));
job.waitForCompletion(true);
}
}
SortMapper
package hadoop_test.sort_test_08.sort; import hadoop_test.sort_test_08.domain.Movie; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SortMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Movie movie = new Movie(); String line = value.toString(); String name = line.split(" ")[0]; int hot=Integer.parseInt(line.split(" ")[1]); movie.setName(name); movie.setHot(hot); // movie类作为key context.write(movie, NullWritable.get()); } }
输出结果
目标:用按照不同数位划分出三个不同的Reducer来处理,并且Reducer内部都整体有序。
package hadoop_test.sort_test_08.totalsort;
import hadoop_test.Utils_hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TotalSortDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(TotalSortDriver.class);
job.setMapperClass(TotalSortMapper.class);
job.setReducerClass(TotalSortReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(3);
job.setPartitionerClass(TotalSortPartitioner.class);
FileInputFormat.setInputPaths(job,new Path("/hadoop_test/sort_test/sort2.txt"));
FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/sort_test/result2"));
job.waitForCompletion(true);
}
}
TotalSortMapper
package hadoop_test.sort_test_08.totalsort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class TotalSortMapper extends MapperTotalSortPartitioner{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] data = line.split(" "); for(String num:data){ context.write(new IntWritable(Integer.parseInt(num)),new IntWritable(1)); } } }
package hadoop_test.sort_test_08.totalsort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; public class TotalSortPartitioner extends PartitionerTotalSortReducer{ @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { // key.toString().matches("[0-9]")匹配10一下数子key.toString().matches("[0-9][0-9]匹配0-100 if(key.toString().matches("[0-9]")|key.toString().matches("[0-9][0-9]")){ return 0; // 个位数和两位数在 part-r-00000 }else if(key.toString().matches("[0-9][0-9][0-9]")){ return 1; // 三位数在 part-r-00001 }else{ return 2; // 四位数在 part-r-00002 } } }
package hadoop_test.sort_test_08.totalsort; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class TotalSortReducer extends Reducer{ @Override protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException { int result=0; for(IntWritable value:values){ result=result+value.get(); } context.write(key, new IntWritable(result)); } }
输出结果



