MapReduce总结
前言
MapReduce是一个分布式计算的框架,由Google在2004年的论文中发表,之前已经对该论文进行了翻译工作,这篇博客主要针对其中的重点进行总结。
整个MapReduce的论文,基本上可以看作三个部分:
-
MapReduce的编程模型和应用场景;
-
MapReduce实际的实现机制;
-
MapReduce的性能迭代优化;
MapReduce编程模型
MapReduce的编程模型非常简单,只有两个函数:Map函数与Reduce函数。
Map 函数,顾名思义就是一个映射函数,它会接受一个 key-value 对,然后把这个 key-value 对转换成 0 到多个新的 key-value 对并输出出去。
map (k1, v1) -> list (k2, v2)
Reduce 函数,则是一个化简函数,它接受一个 Key,以及这个 Key 下的一组 Value,然后化简成一组新的值 Value 输出出去。
reduce (k2, list(v2)) -> list(v3)
而在 Map 函数和 Reduce 函数之外,开发者还需要指定一下输入输出文件的路径。输入路径上的文件内容,会变成一个个键值对给到 Map 函数。而 Map 函数会运行开发者写好的映射逻辑,把数据作为新的一组键值对输出出去。
Map 函数的输出结果,会被整个 MapReduce 程序接手,进行一个叫做混洗的操作。混洗会把 Map 函数输出的所有相同的 Key 的 Value 整合到一个列表中,给到 Reduce 函数。并且给到 Reduce 函数的 Key,在每个 Reduce 里,都是按照 Key 排好序的。
这里排好序并不是 MapReduce 框架本身的核心需求,而是为了技术上实现方便。因为我们要把相同 Key 的数据放到一起处理,而通过一个 HashMap 把所有的数据放在内存里又不一定放得下。那么利用硬盘进行外部排序是一个最简单的,没有内存大小依赖的对数据根据 Key 进行分组的解决办法。最后,在拿到混洗完成,分好组的数据之后,Reduce 函数就会运行你写好的化简逻辑,最终输出结果。
MapReduce的应用场景
论文里列出了六个它能够实现的应用场景:
-
分布式 grep;
-
统计 URL 的访问频次;
-
反转网页 - 链接图;
-
分域名的词向量;
-
生成倒排索引;
-
分布式排序。
MapReduce的实现机制
MapReduce的实现中有两个值得重视的问题:
-
第一个,自然是如何做好各个服务器节点之间的“协同”;
-
第二个就是解决出现各种软硬件问题后的“容错”。
MapReduce的协同
一个MapReduce的集群,通常会有一个调度系统。当我们要运行一个MapReduce任务的时候,其实就是把这个任务提交给调度系统,让调度系统来分配和安排Map函数与Reduce函数。
在 MapReduce 任务提交了之后,整个 MapReduce 任务就会按照这样的顺序来执行。
第一步,你写好的 MapReduce 程序,已经指定了输入路径。所以 MapReduce 会先找到 GFS 上的对应路径,然后把对应路径下的所有数据进行分片(Split)。每个分片的大小通常是 64MB,这个尺寸也是 GFS 里面一个块(Block)的大小。接着,MapReduce 会在整个集群上,启动很多个 MapReduce 程序的复刻(fork)进程。
第二步,在这些进程中,有一个和其他不同的特殊进程,就是一个 master 进程,剩下的都是 worker 进程。然后,我们会有 M 个 map 的任务(Task)以及 R 个 reduce 的任务,分配给这些 worker 进程去进行处理。这里的 master 进程,是负责找到空闲的(idle)worker 进程,然后再把 map 任务或者 reduce 任务,分配给 worker 进程去处理。这里你需要注意一点,并不是每一个 map 和 reduce 任务,都会单独建立一个新的 worker 进程来执行。而是 master 进程会把 map 和 reduce 任务分配给有限的 worker,因为一个 worker 通常可以顺序地执行多个 map 和 reduce 的任务。
第三步,被分配到 map 任务的 worker 会读取某一个分片,分片里的数据就像上一讲所说的,变成一个个 key-value 对喂给了 map 任务,然后等 Map 函数计算完后,会生成的新的 key-value 对缓冲在内存里。
第四步,这些缓冲了的 key-value 对,会定期地写到 map 任务所在机器的本地硬盘上。并且按照一个分区函数(partitioning function),把输出的数据分成 R 个不同的区域。而这些本地文件的位置,会被 worker 传回给到 master 节点,再由 master 节点将这些地址转发给 reduce 任务所在的 worker 那里。
第五步,运行 reduce 任务的 worker,在收到 master 的通知之后,会通过 RPC(远程过程调用)来从 map 任务所在机器的本地磁盘上,抓取数据。当 reduce 任务的 worker 获取到所有的中间文件之后,它就会将中间文件根据 Key 进行排序。这样,所有相同 Key 的 Value 的数据会被放到一起,也就是完成了我们上一讲所说的混洗(Shuffle)的过程。
第六步,reduce 会对排序后的数据执行实际的 Reduce 函数,并把 reduce 的结果输出到当前这个 reduce 分片的最终输出文件里。
第七步,当所有的 map 任务和 reduce 任务执行完成之后,master 会唤醒启动 MapReduce 任务的用户程序,然后回到用户程序里,往下执行 MapReduce 任务提交之后的代码逻辑。
MapReduce的容错
MapReduce的容错机制中主要用了两种手段:重新运行和写Checkpoints。
worker节点的失效
对于 worker 节点的失效,MapReduce 框架解决问题的方式非常简单。就是换一台服务器重新运行这个 worker 节点被分配到的所有任务。master 节点会定时地去 ping 每一个 worker 节点,一旦 worker 节点没有响应,我们就会认为这个节点失效了。
于是,我们会重新在另一台服务器上,启动一个 worker 进程,并且在新的 worker 进程所在的节点上,重新运行所有失效节点上被分配到的任务。而无论失效节点上,之前的 map 和 reduce 任务是否执行成功,这些任务都会重新运行。因为在节点 ping 不通的情况下,我们很难保障它的本地硬盘还能正常访问。
master节点的失效
对于 master 节点的失效,事实上谷歌已经告诉了我们,他们就任由 master 节点失败了,也就是整个 MapReduce 任务失败了。那么,对于开发者来说,解决这个问题的办法也很简单,就是再次提交一下任务去重试。
因为 master 进程在整个任务中只有一个,它会失效的可能性很小。而 MapReduce 的任务也是一个用户离线数据处理的任务,并不是一个实时在线的服务,失败重来通常也没有什么影响,只是晚一点拿到数据结果罢了。
虽然在论文发表的时候,谷歌并没有实现对于 master 的失效自动恢复机制,但他们也给出了一个很简单的解决方案,那就是让 master 定时把它里面存放的信息,作为一个个的 Checkpoint 写入到硬盘中去。
那么我们动一下脑筋,我们可以把这个 Checkpoint 直接写到 GFS 里,然后让调度系统监控 master。这样一旦 master 失效,我们就可以启动一个新的 master,来读取 Checkpoints 数据,然后就可以恢复任务的继续执行了,而不需要重新运行整个任务。
对错误数据视而不见
在处理海量数据的情况下,可能会经常遇到“脏数据”的问题,从而导致Map与Reduce函数处理不了。MapReduce对于这些极少数的数据异常带来的问题,也提供了一个容错机制。
MapReduce 会记录 Map 或者 Reduce 函数,运行出错的具体数据的行号,如果同样行号的数据执行重试还是出错,它就会跳过这一行的数据。如果这样的数据行数在总体数据中的比例很小,那么整个 MapReduce 程序会忽视这些错误,仍然执行完成。毕竟,一个 URL 被访问了 1 万次还是 9999 次,对于搜素引擎的排序结果不会有什么影响。
MapReduce的性能优化
MapReduce主要的性能优化有三点:
-
移动数据不如移动程序
-
中间数据能少则少
-
备份任务
把程序搬到数据那儿去
在 MapReduce 这个框架下,就是在分配 map 任务的时候,根据需要读取的数据在哪里进行分配。通过前面 GFS 论文的学习,我们可以知道,GFS 是知道每一个 Block 的数据是在哪台服务器上的。而 MapReduce,会找到同样服务器上的 worker,来分配对应的 map 任务。如果那台服务器上没有,那么它就会找离这台服务器最近的、有 worker 的服务器,来分配对应的任务。
通过 Combiner 减少网络数据传输
除了 Map 函数需要读取输入的分片数据之外,Reduce 所在的 worker 去抓取中间数据,一样也需要通过网络。那么要在这里减少网络传输,最简单的办法,就是尽可能让中间数据的数据量小一些。
自然,在 MapReduce 的框架里,也不会放过这一点。MapReduce 允许开发者自己定义一个 Combiner 函数。这个 Combiner 函数,会对在同一个服务器上所有 map 输出的结果运行一次,然后进行数据合并。
备份任务
MapReduce 操作总时间拖延的常见原因之一是“拖尾”现象:一台机器花费异常长的时间来完成最后几个 map 或者 reduce 任务。拖尾之所以出现,可能是由于多种原因。例如,磁盘损坏的机器可能会经常遇到可纠正的错误,从而将其读取性能从30 MB/s降低到1 MB/s。集群调度系统可能在机器上调度了其他任务,由于竞争CPU、内存、本地磁盘或网络带宽,导致其执行MapReduce代码的速度较慢。
MapReduce有一个通用的机制来减轻拖尾的影响。当MapReduce操作接近完成时,主服务器会调度其余正在进行的任务进行备份执行。每当主执行或备份执行完成时,该任务就会标记为已完成,这大大减少了完成大型MapReduce操作的时间。
MapRedcue的debug信息
MapReduce提供了三种方法来提升对于开发者的易用性:
第一个,是提供一个单机运行的 MapReduce 的库,这个库在接收到 MapReduce 任务之后,会在本地执行完成 map 和 reduce 的任务。这样,你就可以通过拿一点小数据,在本地调试你的 MapReduce 任务了,无论是 debugger 还是打日志,都行得通。
第二个,是在 master 里面内嵌了一个 HTTP 服务器,然后把 master 的各种状态展示出来给开发者看到。这样一来,你就可以看到有多少个任务执行完了,有多少任务还在执行过程中,它处理了多少输入数据,有多少中间数据,有多少输出的结果数据,以及任务完成的百分比等等。同样的,里面还有每一个任务的日志信息。另外通过这个 HTTP 服务器,你还可以看到具体是哪一个 worker 里的任务失败了,对应的错误日志是什么。
最后一个,是 MapReduce 框架里提供了一个计数器(counter)的机制。作为开发者,你可以自己定义几个计数器,然后在 Map 和 Reduce 的函数里去调用这个计数器进行自增。所有 map 和 reduce 的计数器都会汇总到 master 节点上,通过上面的 HTTP 服务器里展现出来。
遗憾与缺陷
尽管 MapReduce 框架已经作出了很多努力,但是今天来看,整个计算框架的缺陷还是不少的。在我看来,主要的缺陷有两个:
-
第一个是还没有 100% 做到让用户意识不到“分布式”的存在,无论是 Combiner 还是 Partitioner,都是让开发者意识到,它面对的还是分布式的数据和分布式的程序。
-
第二个是性能仍然不太理想,这体现在两个方面,一个是每个任务都有比较大的 overhead,都需要预先把程序复制到各个 worker 节点,然后启动进程;另一个是所有的中间数据都要读写多次硬盘。map 的输出结果要写到硬盘上,reduce 抓取数据排序合并之后,也要先写到本地硬盘上再进行读取,所以快不起来。
总结
最后用一张思维导图对以上内容进行一个总结:
参考
1、极客时间课程:《大数据经典论文解读》
2、MapReduce论文:https://pdos.csail.mit.edu/6.824/papers/mapreduce.pdf
3、MapReduce翻译:MapReduce论文翻译_qq1234534215的博客-CSDN博客



