栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce框架原理

MapReduce框架原理

MapReduce框架原理

1.切片与MapTask并行度决定机制

MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS储存数据单元。数据切片:数据切片只是在逻辑上对输入进行切片,并不会在磁盘上将其切分成片进行储存。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应一个MapTask。

一个Job的Map阶段并行度由客户端在提交Job时的切片数决定

每一个Split切片分配一个MapTask并行实例处理

默认情况下,切片大小 = BlockSize

切片时不考虑数据集整体,而是逐个针对每个文件单独切片

2. Job提交流程源码和切片源码详解

Job提交流程源码详解
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jq6MBiPw-1647147021234)(Hadoop生态圈之MapReduce.assets/1646898810057.png)]

FileInputFormat切片源码解析

程序先找到你数据存储的目录

开始遍历处理(规划切片)目录下的每一个文件

遍历第一个文件

获取文件大小

计算切片大小,默认情况下,切片大小 = Blocksize

computeSplitSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M

开始切片,每次切片时,都要判断切完剩下的部分是否大于块的1.1倍,不大于1.1倍就划分为一块

将切片信息写到一个切片规划文件中

整个切片的核心过程在getSplit()方法中完成

InputSplit只记录了切片的元数据信息,比如初始位置、长度以及所在节点列表等

提交切片规划文件到TARN上,YARN上的MrAppMaster就可以根据切片规划文件计算开启MapTask个数。

FileInputFormat切片大小的参数配置

源码中计算切片大小的公式

Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize。

切片大小设置

maxsize(切片最大值):参数如果调的比blockSize小,则会让切片变小,而且就等于配置的这个参数的值minsize(切片最小值):参数跳的比blockSize大,则可以让切片变得比blockSize还大。

获取切片信息API

// 获取切片的文件名称
String name = inputSplit.getPath().getName();
// 根据文件类型获取切片信息
FileSplit inputSplit = (FileSplit) context.getInputSplit();

注意:TextInputFormat是默认的FileInputFormat实现类,按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型

ConbinTextInputFormat切片机制

将输入目录下所有文件大小,以此和设置的setMaxInputSplitSize值比较,如果大于或等于设置的最大值,就以最大值切割形成一块切片,剩下数据大小超过设置的最大值且不大于两倍,那么将文件均分成两个虚拟存储块(防止出现太小切片)。如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。 3 Shuffle机制

什么是Shuffle机制?

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle机制Shuffle是属于链接MapTask和ReduceTask的,二者都必须存在,若没有其中一个就没有Shuffle
4.什么是Partition分区?

Partition是Shuffle机制中环形缓冲区的元数据中的一个参数,可以根据业务要求来对Partition进行赋值,让它在Reduce阶段根据Partiton参数来进行分区,从而能根据key来进行控制分区。

默认Partition分区

public class HashPartitioner extends Partitioner {
	public int getPartition(K key, V value, int numReduceTasks) {
		return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
	}
}

默认分区是根据key的hashCode对ReduceTasks个数取模得到的,用户没法控制那个key存储到那个分区

自定义Partition步骤

自定义类继承Partition,重写getPartition()方法

public class CustomPartitioner extends Partitioner {
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
		// 控制分区代码逻辑
		… …
			return partition;
	}
}

在Job驱动中,设置自定义Partition

job.setPartitionerClass(CustomPartitioner.class);

自定义Partition后,要根据自定义Partition的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

分区总结

如果ReduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件;如果 1 < ReduceTask 的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;如果ReduceTask的数量 = 1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也只会产生一个结果文件;分区号必须从零开始,逐一累加。 5. WritableComparable排序

排序概述

MapTask和ReduceTask均会对数据key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会倍排序,而不管逻辑上是否需要默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到80%后,在对缓冲区中的数据进行一次快排。并将这些有序数据溢写到磁盘上,而当数据处理完毕之后,它会对磁盘上所有文件进行归并排序对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定的阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序生成一个更大文件;如果内存文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上,当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

部分排序:

MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序。 全排序:

最终结果只有一个文件,且文件内部有序。实现方法是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构 辅助排序:(GroupingComparator分组)

在Reduce端对Key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同的Key进入同一个reduce方法时,可以采用分组排序 二次排序

自定义排序中,如果compareTo中的判断条件为两个即为二次排序

自定义排序WritableComparable原理分析

bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序

@Override
public int compareTo(FlowBean bean) {
	int result;
	// 按照总流量大小,倒序排列
	if (this.sumFlow > bean.getSumFlow()) {
		result = -1;
	}else if (this.sumFlow < bean.getSumFlow()) {
		result = 1;
	}else {
		result = 0;
	}
	return result;
}

6.Combiner 合并

什么是Combiner合并

Combiner是MR程序中Mapper和Reduce之外的一种组件。Combiner组件的父类就是Reducer;Combiner和Reduce的区别在与运行位置

Combiner是在每一个MapTask所有的节点上运行Reduce是接收全局所有Mapper的输出结果 Combiner的意义就是对每一个MapTask的输出进行全局汇总,以减小网络传输量。**Combiner能够应用的前提是不能影响最终的业务逻辑,**而且,Combiner的输出kv应该跟Reduce的输入kv类型要对应起来。

自定义实现Combiner实现步骤

自定义一个Combiner继承Reduce,重写Reduce方法

public class WordCountCombiner extends Reducer {
 	private IntWritable outV = new IntWritable();
 	@Override
 	protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
	 int sum = 0;
	 for (IntWritable value : values) {
 			sum += value.get();
	 	}
 
 		outV.set(sum);
 
 		context.write(key,outV);
 	}
}

在Job驱动类中设置

job.setCombinerClass(WordCountCombiner.class);
7.ReduceTask并行决定机制

设置ReduceTask并行度(个数)

ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发度由切片数决定不同,ReduceTask数量的决定是可以直接手动设置的

// 默认值是 1,手动设置为 4
job.setNumReduceTasks(4);

注意事项

ReduceTask = 0,表示没有Reduce阶段,输出文件个数和Map个数一致。ReducTask默认值就是1,使用输出文件个数为一个。如果数据分布不均匀,就有可能中Reduce阶段产生数据倾斜ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有一个ReduceTask。具体多少个ReduceTask,需要根据集群性能而定。如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程,因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1.不大于1肯定不执行。 8.OutputFormat数据输出

OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。默认输出格式为TextOutputFormat 自定义OutputFormat

应用场景:输出数据到MySQL、Hbase、Elasticsearch等储存框架中。步骤:

自定义一个类继承FileOutputFormat改写RecordWriter,具体改写输出数据的方法write() 总结

输入数据接口:InputFormat

默认使用的实现类是:TextInputFormatTextInputFormat的功能呢逻辑是:一次读一行文本,然后将该行的起始偏移量作为key,行作为value返回ConbineTextInoutFormat可以把多个小文件合并成一个切片处理,提高处理效率 逻辑处理接口:Mapper

根据业务需求实现其中的三个方法:map(),setup(),cleanup() Partitioner分区

有默认实现HashPartitioner,逻辑是根据key的哈希值和numReduce来返回一个分区号如果业务上有特别的需求,可以自定义分区 Comparable排序

当我们用自定义的对象作为key来输出时,就必须要实现WritableComparable接口,重写其中comparaTo()方法部分排序:对最终的每个文件进行内部排序全排序:对所有数据进行排序,通常只有一个Reduce二次排序:排序的条件有两个 Combiner合并

Combiner合并可以提高程序执行效率,减少IO传输。但是使用时必须不能影响原有的业务处理结果 逻辑处理接口:Reducer

根据业务需求实现其中三个方法:reduce(),setup(),cleanup() 输出数据接口:OutputFormat

默认实现类是TextOutputFormat,功能逻辑是:将每个KV对,向目标文本文件输出一行用户还可以自定义OutputFormat

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762074.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号