- MapReduce是Hadoop提供的一套分布式计算框架。从Hadoop2.0开始,MapReduce就是一个纯粹的计算框架MapReduce是Doug Cutting根据Google的The Google MapReduce来仿照实现的MapReduce会将整个计算过程拆分为2个阶段:Map(映射)阶段和Reduce(规约)阶段MapReduce在刚开始的时候,会对文件进行切片(Split)处理。切分完成之后,每一个Split会交给一个单独的MapTask来处理Split和Block
- 切片:Split,本质上是一种逻辑切分,切片之后,每一个切片会交给一个单独的子任务(MapTask来处理)切块:Block,本质上是一种物理切分,切分之后,每一个切块会交给某一个DataNode来存储如果没有指定,默认情况下,Split和Block等大
// Map阶段处理逻辑 // MapReduce中,要求被处理的数据都能够被序列化 // KEYIN - 表示输入的键的类型。如果不指定,则这个值表示当前行的字节偏移量 // VALUEIN - 表示输入的值的类型。如果不指定,则这个值表示当前行的一行数据 // KEYOUT - 表示输出的键的类型。当前案例中,键表示字符 // VALUEOUT - 表示输出的值的类型。当前案例中,值表示次数 public class CharCountMapper extends Mapper{ private final IntWritable once = new IntWritable(1); // Map阶段处理逻辑要覆盖到这个方法中 // Key: 键。当前行的自己偏移量 // value:值,当前行一行数据 // context:环境参数 @Override protected void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException { // key = 3 // value = "hello" // 转化字符串 char[] cs = value.toString().toCharArray(); // 遍历 // h:1 e:1 l:2 o:1 // h:1 e:1 l:1 l:1 o:1 for (char c : cs) { context.write(new Text(c + ""), once); } } }
// KEYIN,VALUEIN - 输入的键值类型。Reducer的数据从Mapper来的 // 也就意味着Mapper的输出就是Reducer的输入 // KEYOUT,VALUEOUT - 输出的键值类型。当前案例中,键是字符,值是总次数 public class CharCountReducer extends Reducer{ // key 键。当前案例中,表示的是字符 // values 值。表示字符对应的次数 // context 环境参数 @Override protected void reduce(Text key,Iterable value,Context context) throws IOException, InterruptedException { // key = 'a' // values = {1,1,1,1,1,1} int sum = 0; // 遍历迭代器求和 for (IntWritable val : values){ sum +=val.get(); } // 写出结果 context.write(key, new IntWritable(sum)); } }
public class CharCountDriver {
public static void main(String[] args) throws IOException {
//构建环境遍历
Configuration conf = new Configuration();
// 申请YARN运行
Job job = Job.getInstance(conf);
// 设置入口类
job.setJarByClass(CharCountDriver.class);
// 指定mapper类
job.setMapperClass(CharCountMapper.class);
// 指定Reduce类
job.setReducerClass(CharCountReducer.class);
// 指定Mapper类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定Reducer类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定输入路径
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop01:9000/txt/characters.txt"));
// 指定输出路径 - 要求输出路径不存在
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop01:9000/result/char_count"));
// 提交任务
job.waitForCompletion(true);
}
}
组件
序列化
1.MapReduce中,要求所有被传输的数据必须能够被序列化。MapReduce的序列化机制依赖于AVRO来实现的,但是在AVRO的基础上进行了封装,提供了更简单的方式,只需要让被序列化的类实现接口writable即可
Partitioner - 分区- 分区的作用是对数据进行分类处理案例:分地区,统计每一个人花费的总流量在分区的时候,需要对分区进行编号,编号从0开始在MapReduce中,默认只有1个分区,也只有一个1个ReduceTask来处理数据,最后只会产生一个结果文件分区的数量决定ReduceTask的数量在MapReduce中,如果不指定Partitioner,那么默认使用的是HashPartitioner
- MapReduce默认会对放在键的位置上的元素进行排序,因此要求放在键的位置上的元素所对应的类必须实现接口Comparable
Combiner会降低数据条数但是不改变最后的计算结果
经过测试,使用Combiner大约能够提升40%的效率
Combiner的逻辑和Reducer的逻辑是一样的
指定Combiner
// 指定Combiner类 job.setCombinerClass(xxxReducer.class);
能够进行结果传递的场景可以使用Combiner,例如求和、求积、最值、去重;不能够进行结果传递的场景不可以使用Combiner,例如平均值
InputFormat发生在Mapper之前,负责对文件进行切片以及读取,将读取出来的数据交给Mapper处理,因此InputFormat读取出来的数据是什么格式,那么Mapper接受的就是什么格式
默认情况下,依靠FileInputFormat来对文件进行切片,依靠TextInputFormat来对切片进行读取
Split最小是1B,最大是Long.MAX_VALUE个字节
MapReduce中,文件存在是否可以进行逻辑切分的问题。通常情况下,认为文件是可以切分的,但是如果是压缩文件 ,那么不一定。如果文件不可切,则整个文件作为一个切片(Split)来处理;
默认情况下,Split大小和Block大小等大。如果需要调小SplitSize,那么需要将maxSize调小;如果需要将Split调大,把么需要将minSize调大
protected long computeSplitSize(long blockSize,long minSize,long maxSize) {
return Math.max(minSize,Math.min(maxSize,blockSize));
}
在切片过程中,需要注意默认阈值1.1。如果一个文件有520M大小,这个文件对应5个Block:4 * 128 + 8M;同时这个文件对应4个Split:3 * 128M + 136M
在MapReduce中,默认只有BZip2Code(后缀.bz2)可以被切分
自定义inputFormat:定义一个类继承InputFormat。但是考虑到getSplits过程比较复杂,所以考虑顶一个类继承子类FileInputFormat
多源输入:在MapReduce中,允许一次性输入多个路径来同时处理不同文件。在处理的时候,文件的格式可以相同也可以不同。如果文件格式一致,那么可以使用FileInputFormat;如果文件格式不一致,那么需要使用MultipleInputs。多源输入的时候,要求文件之间没有关联
- OutputFormat是发生在Reduce之后,将Reduce产生的结果来写出到指定的位置上,因此Reduce输出的结果为什么类型,OutputFormat接收的就是什么类型在MapReduce中,默认情况下,通过FileOutFormat来校验输出路径是否存在,通过TextOutputFormat来写出数据MapReduce也支持自定义输出格式,但是实际过程中很少使用
MapTask调用map方法处理数据产生结果,将结果写到MapTask自带的缓冲区,缓冲区维系在内存中。每一个MapTask都自带一个缓冲区
缓冲区维系在内存中,默认大小是100M(可以通过属性mapreduce.task.io.sort.mb来修改,单位是M),本质上是一个环形的字节数组
数据在缓冲区会进行分区(partition),排序(sort)。如果指定了Combiner,那么还会进行combine(合并)操作。这一次排序是将毫无规律的数据整理成有序的数据,采用的是快速排序(Quick Sort)
当缓冲区使用达到指定阈值(默认是0.8,通过属性mapreduce.map.sort.spill.percent来修改)的时候,会将缓冲区中的数据进行溢写(spill),产生一个溢写文件(spillable file)。因为数据在缓冲区中已经分区排序,所以产生的溢出文件中的数据也是分好区排好序的
溢写之后,缓冲区空出,那么MapTask后来产生的数据会继续写到缓冲区中,当再次达到阈值的时候,会再次进行溢写,每次溢写都会产生一个新的溢写文件。溢写文件之间的数据是局部有序整体无序的
当MapTask处理完所有的数据,会将所有的溢写文件进行合并(merge),合并成一个大的结果文件(final out)。如果MapTask处理完所有数据,依然有结果在缓冲区中,那么会将缓冲区的数据直接冲刷到结果文件final out中
在merge过程中,数据会再次进行分区和排序。如果指定了Combiner并且溢写文件的个数>=3个,那么在merge过程中还会进行combine操作,因此final out文件中的数据是分好区且排好序的。这次排序是将局部有序的数据整理成整体有序的数据,采用的是归并排序(Merge Sort)
merger合并会减少文件个数,combine合并会减少文件条数
注意问题:
- 缓冲区设置为环形的目的是为了减少重复寻址阈值的目的是为了降低阻塞的几率溢写文件的大小要考虑序列化因素的影响原始数据大小并不能直接决定溢写次数
ReduceTask达到启动阈值(0.05,即有5%的MapTask结束,通过属性mapreduce.job.reduce.slowstart.completedmaps来调节)的时候,会启动抓取MapTask的数据
ReduceTask启动之后,会启动一类fetch线程来获取数据。默认情况下,每一个ReduceTask能够启动5个fetch线程来获取数据,可以通过属性mapreduce.reduce.shuffle.parallelcopies来调节
fetch线程启动之后,会通过http请求中的GET请求来获取数据,在发送GET的请求之后,会携带当前ReduceTask要处理的分区号
fetch线程获取到数据之后,会将数据临时存储到本地磁盘上形成一个个的小文件。当fech线程抓取完所有数据之后,会将这些小文件进行合并(merge),合并成一个大文件。在merge过程中,依然会对数据进行排序,这次排序使用的是归并排序
merge完成之后,会将相同的键对应的值分到一组去形成一个伪迭代器(本质上是一个基于迭代模式的流),这个过程称之为分组(group)。分组完成之后,每个键触发调用一次reduce方法
- 同时处理多个文件,并且文件之间还相互关联。在处理的时候,需要确定一个主文件,将其他的关联文件作为缓存存根的方式来放到缓存中,当处理主文件的时候需要用到其他文件的时候,再从缓存中来获取关联文件进行处理
- 数据倾斜指的是任务之间处理的数据量不均等实际过程中,大部分倾斜产生在Reduce端,但是Map端也会产生倾斜。Map端的倾斜条件:多源输入、文件大小不均等且文件大小不可切。Map端产生倾斜的条件缺一不可 - Map端的数据倾斜无法解决实际过程中,有超过90%的数据倾斜是Reduce产生的。Reduce端产生数据倾斜的直接原因是对数据进行分类,本质原因是数据本身就是倾斜特性对于Reduce端的数据倾斜,可以考虑使用二阶段聚合的方式来解决。第一个阶段的目的是把数据打散之后分步聚合,第二个阶段再进行最终的聚合 。数据量越大且倾斜度越高的时候,二阶段聚合的效率才越高
小文件的危害:
- 存储:每一个文件对应一条元数据。如果存储大量小文件,那么产生大量的元数据,从而导致查询效率变低计算:每一个小文件会对应一个切片。如果计算大量小文件,那么会产生大量MapTask。MapTask是MapReduce任务的子任务(本质上可以看作是一个线程),在执行的时候需要耗费集群资源的。如果产生大量的MapTask,那么就需要进行集群资源的大量调度,同时导致大量线程的创建和销毁
目前市面上,针对小文件的处理手段无法两种:合并和打包
Hadoop提供了一种原生的打包手段:Hadoop Archive
hadoop archive -archiveName txt.har -p / txt /result
推测执行机制是MapReduce针对慢任务的一种优化:当MapReduce中,出现慢任务的时候,会将这个任务拷贝一份到其他的节点,两个节点同时执行相同的任务,谁先执行完,谁的结果就作为最后的结果,另一个没有被执行完的任务就会被kill掉
慢任务出现的场景:
- 数据倾斜任务分配不均匀节点性能不一致
在实际开发过程中,因为数据倾斜导致出现慢任务的概率更高,而此时推测执行机制是无效的,并且导致占用更多的资源,因此实际过程中一般会考虑关闭推测执行机制。通过属性来关闭
# true为开启 mapreduce.map.speculative = true mapreduce.reduce.speculative = true



