栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践——Spark项目(六)

山东大学软件工程应用与实践——Spark项目(六)

2021SC@SDUSC
前言

        上次博客分析了Reduce端读取中间计算结果的详细过程,沿着Mapreduce架构的运算过程,我们此次顺理成章地分析reduce端的计算过程,并对map端和reduce端进行综合分析。


 一.reduce端计算 1.1 reduce如何同时处理多个map任务的中间结果 

        在实际使用中,reduce任务的上游map任务可能有多个,根据之前博客的分析,我们知道这些中间结果的Block及数据都缓存在迭代器ShuffleBlockFetcherlterator的results: new linkedBlock-ingQueue[FetchResult]中。ShuffeBlockFetcherlterator作为迭代器,其实现如下:

override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch

override def next(): (BlockId, Try[Iterator[Any]]) = {
    numBlocksProcessed += 1
    val startFetchiait = System.currentTimeMillis()
    currentResult = results.take()
    val result = currentResult
    val stopFetchWait = System.currentTineMillis()
    shuffleMetrics.fetchWaitTime += (stopFetchWait - startFetchWait)

    result match {
        case SuccessFetchResult(_, size, _) => bytesInFlight -= size
        case _ =>
    }
    while (fetchRequests.nonEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFliaht)) {
        sendRequest(fetchRequests.dequeue())
    }

    val iteratorTry: Try[Iterator[Any]] = result match {
        case FailureFetchResult(_, e) =>
            Failure(e)
        case SuccessFetchResult(blockId, _,buf) =>
            Try(buf.createInputStream()).map { is0 =>
                val is = blockManaqer.wrapForCompression(blockId, is0)
                val iter = serializer.newInstance().deserializeStream(is).asIterator
                CompletionIterator[Any, Iterator[Any]](iter, {
                    currentResult = null
                    buf.release()
                })
            }
        }

        (result.blockId, iteratorTry)
    }
}

        分析以上代码可知,每次迭代,ShuffeBlockFetcherIterator会先从results: new linkedBlock-ingQueue[FetchResult]中取出一个FetchResult,并构造此FetchResult的迭代器iteratorTry,而具体迭代的数据就是从iteratorTry中获取。每当iteratorTry迭代结束,才会再次迭代ShuffleBlockFet-cherlterator。


 1.2 reduce端在缓存中对中间计算结果执行聚合和排序

        reduce端获取map端计算中间结果后,会将ShuffleBlockFetcherlterator封装为InterruptibleI-terator并进行聚合。其聚合操作主要依赖aggregator的combineCombinersByKey方法,其实现如下:

def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext): Iterator[(K, C)] = 
    {
        if(!isSpillEnabled) {
            val combiners = new AppendonlyMap[K,C]
            var kc: Product2[K, C] = null
            val update = (hadValue: Boolean, oldValue: C) => {
                if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
            }
            while (iter.hasNext) {
                kc = iter.next()
                combiners.changevalue(kc._1, update)
            }
            combiners.iterator
        } else {
            val combiners = new ExternalAppendonlyMap[K, c, C](identity, mergeCombiners, mergeCombiners) {
            while (iter.hasNext) {
                val pair = iter.next()
                combiners.insert(pair._1, pair._2)
            }
            Option(context).foreach { c =>
                c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
                c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
        }
        combiners.iterator
    }
}

        分析以上代码可知,if语句中,如果 isSpillEnabled 为 false ,则会再次使用AppendOnlyMap的changevalue方法,该方法已在之前关于 AppendOnlyMap的缓存聚合算法的博客分析过,在此不作赘述。而如果 isSpillEnabled 为 true(默认值),那么就会使用 ExternalAppendonlyMap 完成聚合。 其实质也是使用SizeTrackingAppendOnlyMap,该方法也已在之前关于 AppendOnly-Map的缓存聚合算法的博客中分析过了。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457979.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号