一、回顾
Shuffle机制:
一个mapper将数据写入到环形缓冲区(在堆内部的一块内存,默认100M),写入数据时一边写元信息(数据在环形缓冲区中的索引,分区号等),一边写数据本身(左边写元数据,右边写数据),写到80%认为环形缓冲区已满,出现溢写(将数据写到磁盘上),当这80%数据写到磁盘的过程中,需要分区和排序,这两件事情并不是一瞬间就完成,map在此时依然向环形缓冲区中写数据,如果map在写20%数据时,80%数据没有处理完,map会等待溢写完成后继续将数据写入环形缓冲区。
在环形缓冲区中的数据都是序列化好的数据,在区内二次排序(先按照分区号排序,后按照块大小排序)需要拿到两个数据的值,也就是将两个数据反序列化,compare进行比较,如果需要交换,不会交换数据本身,而是交换两个数据的索引,因为数据的长度不确定,索引的长度确定,之后按照索引的顺序写出到磁盘,这时溢写出的数据就是一份排序好的文件。
最终会有多个溢写文件,将多个文件合并成一个文件(归并),如果采用分区,则按照分区进行归并,各区归并各区,经过combiner的合并,压缩后将数据写入到磁盘上,文件分区且区内有序,这就是整个MapTask的过程。
每个Reduce从MapTask中下载自己对应分区的数据,会将数据一边归并一边下载到内存缓冲中,如果缓冲不够了,就溢写到磁盘上。可以根据集群的性能,自定义调节并行下载的数量,全部归并之后按照相同的key进行分组,此为Shuffle的全部流程。
RM,NM不负责具体任务的运行,如果有一个job被提交,RM在NM中启动一个AM来进行管理。RM,NM只负责监控集群的全部资源,NM会将自己的资源以容器的形式发送出去。
各阶段完成的任务:
InputFormat:文件变成KV
Shuffle:整理数据
OutputFormat:KV变成文件,接受reduce输出的数据,这些数据要以什么样的形式持久化,要以什么样的形式使用这份数据,都是由OF负责。
OutputFormat数据输出:
默认使用的是TextOutputFormat,它将每条记录写成文本行,每行包含key的toString,制表符,value的toString三部分。如果不行使用默认的,也可以自定义OF。
需求:将log.txt文件中包含baidu的网站输出到baidu.log,不包含的输入到other.log。
public class MyOutputFormat extends FileOutputFormat{ @Override public RecordWriter getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { return new MyRecordWriter(taskAttemptContext); } } public class MyRecordWriter extends RecordWriter { FSDataOutputStream baidu = null; FSDataOutputStream other = null; public MyRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException { Configuration configuration = taskAttemptContext.getConfiguration(); String outDir = configuration.get(FileOutputFormat.OUTDIR); FileSystem fileSystem = FileSystem.get(configuration); baidu = fileSystem.create(new Path(outDir + "/baidu.log")); other = fileSystem.create(new Path(outDir + "/other.log")); } @Override public void write(LongWritable key, Text value) throws IOException, InterruptedException { String line = value.toString() + "n"; if(line.contains("baidu")){ //写入baidu.log baidu.write(line.getBytes()); }else{ //写入other.log other.write(line.getBytes()); } } @Override public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { } } public class OutputDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(OutputDriver.class); job.setOutputFormatClass(MyOutputFormat.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean b = job.waitForCompletion(true); System.exit(b?0:1); } }
join多种应用:reduce join
需求:将以下两张图中的数据合并成第三张图。
OrderBean类:
public class OrderBean implements WritableComparable{ private String id; private String pid; private int amount; private String pname; @Override public String toString() { return id + "t" + pname + "t" + amount; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getPid() { return pid; } public void setPid(String pid) { this.pid = pid; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } @Override public int compareTo(OrderBean o) { int i = this.pid.compareTo(o.pid); if(i != 0){ return i; }else { return o.pname.compareTo(this.pname); } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(pid); dataOutput.writeUTF(id); dataOutput.writeInt(amount); dataOutput.writeUTF(pname); } @Override public void readFields(DataInput dataInput) throws IOException { this.id = dataInput.readUTF(); this.pid = dataInput.readUTF(); this.amount = dataInput.readInt(); this.pname = dataInput.readUTF(); } }
OrderMapper类:
public class OrderMapper extends Mapper{ private OrderBean order = new OrderBean(); private String filename; @Override protected void setup(Context context) throws IOException, InterruptedException { //获取输入数据的文件名 FileSplit fs = (FileSplit)context.getInputSplit(); filename = fs.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //切分一行 String line = value.toString(); String[] fields = line.split("t"); //封装数据,按照数据来源不同(根据文件名)分别封装 if("order.txt".equals(filename)){ //封装order order.setId(fields[0]); order.setPid(fields[1]); order.setAmount(Integer.parseInt(fields[2])); //需要将字符串转换成int order.setPname(""); //不用的数据设置为空,因为最终需要将数据写出去 }else{ //封装pd order.setPid(fields[0]); order.setPname(fields[1]); order.setAmount(0); //不用的数据,数字类型封装0 order.setId(""); } //写出数据 context.write(order,NullWritable.get()); } }
OrderReducer类
public class OrderReducer extends Reducer{ @Override protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException { //获取迭代器 Iterator iterator = values.iterator(); //迭代第一组数据 iterator.next(); //此时指针指向第一组数据,第一行数据已经读到,当第一次迭代时,指针是不会动的 String pname = key.getPname(); //这两行语句交换顺序不会影响到最终的结果 //迭代剩下的数据,写出并输出 while(iterator.hasNext()){ iterator.next(); key.setPname(pname); context.write(key,NullWritable.get()); } } }
分组比较器:
public class OrderComparator extends WritableComparator {
//先将比较器反序列化,反序列化就需要空对象,如果参数为true,父类就会生成两个新OrderBean对象,否则不会生成
//使用新的OrderBean对象完成反序列化过程
protected OrderComparator() {
super(OrderBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean oa = (OrderBean) a;
OrderBean ob = (OrderBean) b;
return oa.getPid().compareTo(ob.getPid());
}
}
Driver:
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
job.setGroupingComparatorClass(OrderComparator.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b?0:1);
}
}



