配置HDFS的StaticUser
配置完成之后,可以在浏览器上实现对集群的管理(创建文件夹,删除文件夹等)
cd /opt/module/hadoop-3.1.3/etc/hadoop/ vim core-site.xmlxsync core-site.xml stop-dfs.sh start-dfs.sh hadoop.http.staticuser.user hike
1、框架原理回顾
配置HDFS的StaticUser
修改core-site.xml文件的配置。
InputFormat模块:负责把输入数据变为KV值,第一个工作负责数据的切片,考虑将数据分为几份,分完份之后,每个切片会启动一个MapTask并行处理(分布式),处理完成之后,对于每个MapTask,又会调用InputFormat把每一个切片的数据编程KV值,把KV值输入到Mapper中。其中有很多的InputFormat,也自定义了一个InputFormat。
job提交流程:job在提交之前,在客户端需要做出一些准备,重点是向临时文件夹提交了三个文件,第一个是需要使用到的jar包,第二个是切片信息,第三个是job的配置XML文件。
几个概念辨析:
Map阶段:是一个概念性的描述,在这个阶段,实际执行的MapTask,Map阶段的流程是由MapTask控制的(MapTask.run)。
MapTask.run:执行的Map阶段,在run中会创建Mapper的对象,执行Mapper的run方法。
Mapper:在MapTask中会调用自定义Mapper的map方法,也就是Mapper.map。
Mapper.map:一个方法。
二、shuflle机制
从Mapper将数据写出去一直到Reducer将数据写进来之间的阶段都是Shuffle,也就是MapTask的后半部分和ReducerTask的前半部分。在这期间,数据发生了非常关键的变化,Mapper出去的数据是无序的,而在Reducer的KV值有分组的,也就是说,在shuffle阶段,其将数据进行了归纳整理,分好组了,分组通过排序实现。快速排序是最快的,但其要求数据全部都在内存中一次排完,对于大数据环境下是不现实的。框架的排序将待排序的数据分成很多份,按份进行排序,份内有序,称为局部排序。块内使用快排,接下来需要归并(将多个有序的块合成一块有序的块),归并并不需要将所有排序的数据都放到内存中,只需要一块很小的缓冲区就可以完成归并过程。
核心为三次排序:在内存中的快排,之后的归并,最后的归并,一次做的事情分三次做,花费的时间更多,使用的资源更少,空间换时间,永远不变的定理。
可能存在的问题:在map处理完成数据之后,之后需要将数据汇总到reduce中进行处理,在汇总过程中,reduce会把所有的数据放到本地进行一次归并排序,reduce要处理的数据量十分之大,并行处理是解决此问题的一个方法,也就是启动多个reduce来处理数据。
那么reduce的数量如何决定呢:在map中,数据被切成几片,就有几个map来处理数据,但reduce的数量是通过setNumReduceTask()方法,人为设定的,具体设置几个,根据实际情况灵活变动。
每一个reduce处理哪几个map文件:
默认的分区规则:(key.hashcode() & Integer.MAX_VALUE) % numReduceTasks。相同的key一定会进入到相同的reduce,& Integer.MAX_VALUE的目的是去负号,&运算规则:全是1为1,其余为0。
此做法有一个隐患:数据倾斜(如以A开头的字母数量非常多,交给第一个reduce,B开头的一个,交给第二个reduce,C开头的一个,交给第三个reduce)会造成第一reduce的任务十分之重。可以使用自定义倾斜的方式解决数据倾斜的问题。
如何自定义分区:
需求:将统计结果按照手机归属地不同省份输出到不同文件中 , 136/137/128/139开头的手机号分到独立的4个文件中,其他开头的放到一个文件中
package com.hike.mr.partitioner; import com.hike.mr.flow.FlowBean; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class MyPartitioner extends Partitioner{ public int getPartition(Text text, FlowBean flowBean, int numPartitions) { switch (text.toString().substring(0,3)){ case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 4; } } }
新建Driver类,添加以下两行语句,其余部分与其他Driver一样。
//设置reduce分区数量
job.setNumReduceTasks(5);
//使用自定义的组件而不是默认的组件
job.setPartitionerClass(MyPartitioner.class);
WritableComparable自定义排序:框架会自动按照key进行排序,现在想将其他数据(FlowBean)放到key的位置进行排序,需要实现WritableComparable接口,让默认的Comparator实现compareTo方法来进行比较。
FlowBean实现接口,实现方法,Mapper封装数据,Reducer写出数据。
需求:将输出数据再按照总流量将需进行排序。
FlowBean类
//想要在框架中使用,需要实现接口 public class FlowBean implements WritableComparable{ private long upFlow; private long downFlow; private long sumFlow; @Override public String toString(){ return upFlow + "/t" + downFlow + "/t" + sumFlow; } public void set(long upFlow,long downFlow){ this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(upFlow); dataOutput.writeLong(downFlow); dataOutput.writeLong(sumFlow); } public void readFields(DataInput dataInput) throws IOException { this.upFlow = dataInput.readLong(); this.downFlow = dataInput.readLong(); this.sumFlow = dataInput.readLong(); } @Override public int compareTo(FlowBean o) { // if(this.sumFlow < o.sumFlow){ // return 1; // }else if(this.sumFlow == o.sumFlow){ // return 0; // }else{ // return -1; // } return Long.compare(o.sumFlow,this.sumFlow); } }
Mapper类
//默认根据key进行排序,将FlowBean放在key的位置上 public class CompareMapper extends Mapper{ private Text phone = new Text(); private FlowBean flow = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("t"); phone.set(fields[0]); flow.setUpFlow(Long.parseLong(fields[1])); //将字符串转成整数 flow.setDownFlow(Long.parseLong(fields[2])); flow.setSumFlow(Long.parseLong(fields[3])); } }
Reducer类
public class CompareReducer extends Reducer{ @Override protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException { for (Text value : values) { //先写value,再写key,因为reduce收到数据是,流量在前,手机号在后,反过来输出 context.write(value,key); } } }
Driver类
public class CompareDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(CompareDriver.class);
job.setMapperClass(CompareMapper.class);
job.setReducerClass(CompareReducer.class);
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
combiner合并:
Combiner默认不开启,因为可能导致Mapper端和Reducer端的最终结果不一致。
Combiner就是Reducer,对输入的数据有要求,数据一定是要有序的。
在数据从环形缓冲区出来之前,会经过combiner合并一次,归并完成之后会合并一次,最终将某一个map中的数据写到磁盘上,局部汇总。
最终的数据汇总还是会由Reduce完成,Reduce负责全局汇总。
在Driver类中添加job.setConbinerClass(Reducer类名.class)即可开启局部汇总功能,Conbiner功能可以大幅度提升系统的性能。
GroupingComparator分组比较器:在reduce中的数据是一个key有序的数据,进一步需要将数据按照组输入到reduce中,在最后的分组过程中,会比较key的值,相同分为一组,不同分为两组,在这时候,会使用到分组比较器,这就是分组比较器的作用:判断两组数据key的值相不相同。
自定义GroupingComparator分组比较器:
public class OrderBean implements WritableComparable{ private String orderId; private String productId; private double price; @Override public String toString() { return "OrderBean{" + "orderId='" + orderId + ''' + ", productId='" + productId + ''' + ", price=" + price + '}'; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } @Override public int compareTo(OrderBean o) { int compare = this.orderId.compareTo(o.orderId); if(compare != 0){ return compare; }else { return Double.compare(o.price,this.price); } } @Override public void write(DataOutput out) throws IOException { out.writeUTF(orderId); out.writeUTF(productId); out.writeDouble(price); } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.productId = in.readUTF(); this.price = in.readDouble(); } } public class OrderMapper extends Mapper { private OrderBean order = new OrderBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拆分数据 String[] fields = value.toString().split("t"); //封装数据 order.setOrderId(fields[0]); order.setProductId(fields[1]); order.setPrice(Double.parseDouble(fields[2])); //写出数据 //mapper写出去的数据,最终会进入环形缓冲区,缓冲区会将数据写出来,到reduce之前会得到一个有序的数据 //进一步要对这个数据进行分组,如果没有重写分组比较器,会自动调用默认的WritableComparator,调用OrderBean //会按照订单和价格两个因素进行分组,所以下一步需要重写分组比较器 context.write(order,NullWritable.get()); } } public class OrderComparator extends WritableComparator { //初始化,提前将类创建好 //数据在进入环形缓冲区(byte字节数组)时,是要序列化之后,才能够进入的 //写入字节数组,排序时需要将数据反序列化,在本地准备几个空对象,通过空对象的readFields方法实现反序列化 //然后再进行比较,以下告诉它需要创建哪两个空对象 protected OrderComparator() { super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean oa = (OrderBean) a; OrderBean ob = (OrderBean) b; return oa.getOrderId().compareTo(ob.getOrderId()); } } public class OrderReducer extends Reducer { @Override protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } public class OrderDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(OrderDriver.class); //设置分组比较器 job.setGroupingComparatorClass(OrderComparator.class); job.setMapperClass(OrderMapper.class); job.setReducerClass(OrderReducer.class); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }



