前言
上次博客分析了Reduce端读取中间计算结果的详细过程,沿着Mapreduce架构的运算过程,我们此次顺理成章地分析reduce端的计算过程,并对map端和reduce端进行综合分析。
一.reduce端计算 1.1 reduce如何同时处理多个map任务的中间结果
在实际使用中,reduce任务的上游map任务可能有多个,根据之前博客的分析,我们知道这些中间结果的Block及数据都缓存在迭代器ShuffleBlockFetcherlterator的results: new linkedBlock-ingQueue[FetchResult]中。ShuffeBlockFetcherlterator作为迭代器,其实现如下:
override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch
override def next(): (BlockId, Try[Iterator[Any]]) = {
numBlocksProcessed += 1
val startFetchiait = System.currentTimeMillis()
currentResult = results.take()
val result = currentResult
val stopFetchWait = System.currentTineMillis()
shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)
result match {
case SuccessFetchResult(_, size, _) => bytesInFlight -= size
case _ =>
}
while (fetchRequests.nonEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFliaht)) {
sendRequest(fetchRequests.dequeue())
}
val iteratorTry: Try[Iterator[Any]] = result match {
case FailureFetchResult(_, e) =>
Failure(e)
case SuccessFetchResult(blockId, _,buf) =>
Try(buf.createInputStream()).map { is0 =>
val is = blockManaqer.wrapForCompression(blockId, is0)
val iter = serializer.newInstance().deserializeStream(is).asIterator
CompletionIterator[Any, Iterator[Any]](iter, {
currentResult = null
buf.release()
})
}
}
(result.blockId, iteratorTry)
}
}
分析以上代码可知,每次迭代,ShuffeBlockFetcherIterator会先从results: new linkedBlock-ingQueue[FetchResult]中取出一个FetchResult,并构造此FetchResult的迭代器iteratorTry,而具体迭代的数据就是从iteratorTry中获取。每当iteratorTry迭代结束,才会再次迭代ShuffleBlockFet-cherlterator。
1.2 reduce端在缓存中对中间计算结果执行聚合和排序
reduce端获取map端计算中间结果后,会将ShuffleBlockFetcherlterator封装为InterruptibleI-terator并进行聚合。其聚合操作主要依赖aggregator的combineCombinersByKey方法,其实现如下:
def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext): Iterator[(K, C)] =
{
if(!isSpillEnabled) {
val combiners = new AppendonlyMap[K,C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
}
while (iter.hasNext) {
kc = iter.next()
combiners.changevalue(kc._1, update)
}
combiners.iterator
} else {
val combiners = new ExternalAppendonlyMap[K, c, C](identity, mergeCombiners, mergeCombiners) {
while (iter.hasNext) {
val pair = iter.next()
combiners.insert(pair._1, pair._2)
}
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
}
combiners.iterator
}
}
分析以上代码可知,if语句中,如果 isSpillEnabled 为 false ,则会再次使用AppendOnlyMap的changevalue方法,该方法已在之前关于 AppendOnlyMap的缓存聚合算法的博客分析过,在此不作赘述。而如果 isSpillEnabled 为 true(默认值),那么就会使用 ExternalAppendonlyMap 完成聚合。 其实质也是使用SizeTrackingAppendOnlyMap,该方法也已在之前关于 AppendOnly-Map的缓存聚合算法的博客中分析过了。



