核心思想:分治
Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
Reduce负责“合”,即对map阶段的结果进行全局汇总。
主要特点
把一个大的问题,划分成很多小的子问题,并且每个小的子问题的求取思路与我们大问题的求取思路一致
MapReduce框架结构一个完整的mapreduce程序在分布式运行时有三类实例进程:
1、MRAppMaster:负责整个程序的过程调度及状态协调
2、MapTask:负责map阶段的整个数据处理流程
3、ReduceTask:负责reduce阶段的整个数据处理流程
mapReduce编程模型的总结:
MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤
Map阶段2个步骤
第一步:设置inputFormat类,将我们的数据切分成key,value对,输入到第二步
第二步:自定义map逻辑,处理我们第一步的输入数据,然后转换成新的key,value对进行输出
shuffle阶段4个步骤(可以全部不用管)
第三步:对输出的key,value对进行分区。相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合
第四步:对不同分区的数据按照相同的key进行排序
第五步:对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)
第六步:对排序后的额数据进行分组,分组的过程中,将相同key的value放到一个集合当中
reduce阶段2个步骤
第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出
第八步:设置outputformat将输出的key,value对数据进行保存到文件中
MapReduce程序运行模式 本地运行模式
mapreduce程序是被提交给LocalJobRunner在本地以单进程的形式运行,本质是程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local参数
优点:便于进行业务逻辑的debug,只要在IDEA中打断点即可
代码实现:
configuration.set("mapreduce.framework.name","local");
configuration.set("yarn.resourcemanager.hostname","local");
TextInputFormat.addInputPath(job,new Path(""));
TextOutputFormat.setOutputPath(job,new Path(""));
集群运行模式
将mapreduce程序提交给yarn集群,分发到很多的节点上并发执行,处理的数据和输出结果应该位于hdfs文件系统
实现步骤:将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
例:
hadoop jar hadoop_hdfs_operate-1.0-SNAPSHOT.jar com.jmg.hdfs.demo1.JobMainMapReduce分区
在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理,即相同类型的数据,送到一起去处理,在reduce当中默认分区只有1个。
MapReduce排序以及序列化序列化(Serialization)是指把结构化对象转化为字节流。反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。 当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流。反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
hadoop当中没有沿用Java序列化serialize方式,使用的是writable接口,实现了writable接口就可以序列化。如果需要序列化,需要实现writable接口;如果需要排序,需要实现comparable接口;如果既需要序列化,也需要排序,可以实现writable和comparable或者writableComparable
MapTask运行机制- 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
- 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以n作为分隔符,读取一行数据,返回
。Key表示每行首字符偏移值,value表示这一行文本内容。 - 读取split返回
,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。 - map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。
- 接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。
当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。
合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
至此map整个阶段结束。
- 读取split返回
ReduceTask
1、Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
2、Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
3、合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
4、对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。



