栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践: Spark(四) 代码分析

山东大学软件工程应用与实践: Spark(四) 代码分析

2021SC@SDUSC


目录

SparkEnv

1.实例化ShuffleManager

2.shuffle线程内存管理器ShuffleMemoryManager

3.块传输服务BlockTransferService

4.BlockManagerMaster介绍

5.创建块管理器BlockManager

6.创建广播管理器BrodacastManager


SparkEnv

1.实例化ShuffleManager

ShuffleManager负责管理本地及远程的block数据的shuffle操作。ShuffleManager默认为通过反射方式生成SortShuffleManager的实例,可以修改属性spark.shuffle.manager为hash来显式控制使用HashShuffleManager。SortShuffleManager通过持有的IndexShuffleBlockManager间接操作BlockManager中的DiskBlockManager将map结果写入本地,并根据shuffleId、mapId写入索引文件,也能通过MapOutputTrackerMaster中维护的mapStatuses从本地或者其他远程节点读取文件。

Spark作为并行计算框架,同一个作业会被分为多个任务在多个节点上并行执行,reduce的输入可能存在于多个节点上,因此需要通过“洗牌”将所有reduce的输入汇总起来,这个过程就是shuffle。代码如下

    val shortShuffleMgrNames = Map(
        "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
        "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
    val shufflMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass = shortShuffleMgrNames.get
OrElse( shuffleMgrName.toLowerCase, shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager] (shuffleMgrClass)
        
    val shuffleMemoryManager = new ShuffleMemoryManager(conf)

2.shuffle线程内存管理器ShuffleMemoryManager

ShuffleMemoryManager负责整理shuffle线程占有内存的分配与释放,并通过thread-Memort:mutable.HashMap[Long, Long] 缓存每个线程的字节数。

private[spark] class ShuffleMemoryManager(maxMemory: Long) extends Logging{
    private val threadMemory = new mutable.HashMap[Long, Long]() // threadId -> memory         
         bytes
    def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))

getMaxMemory方法用于获取shuffle所有线程占用的最大内存,实现如下

def getMaxMemory(conf: SparkConf): Long = {
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
    (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
}

上面的代码表明,shuffle所有线程占用的最大内存计算公式为:

Java运行时最大内存*Spark的shuffle最大内存占比*Spark的安全内存占比

可以配置属性spark.shuffle.memoryFraction修改Spark的shuffle最大内存占比,配置属性spark.shuffle.safetyFraction修改Spark的安全内存占比

3.块传输服务BlockTransferService

BlockTransferService默认为NettyBlockTransferService(可以配置属性spark.shuffle.blockTransferService使用NioBlockTransferService),它使用Netty提供的异步事件驱动的网络应用框架,提供web服务及客户端,获取远程节点上block的集合

val blockTransferService = 
    conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
        case "netty" =>
            new NettyBlockTransferService(conf, securityManager, numUsableCores)
        case "nio" =>
            new NioBlockTransferService(conf, securityManager)
    }

4.BlockManagerMaster介绍

BlockManagerMaster负责对Block的管理和协调,具体操作依赖于BlockManagerMasterActor。Driver和Executor处理BlockManagerMaster的方式不同:

  • 如果当前应用程序是Driver,则创建BlockManagerMasterActor,并且注册到ActorSystem中
  • 如果当前应用程序是Executor,则从ActorSystem中找到BlockManagerMasterActor。

无论是Driver还是Executor,最后BlockManagerMaster的属性driverActor将持有对BlockManagerMasterActor的引用。BlockManagerMaster的代码如下:

val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
    "BlockManagerMaster",
    new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)

5.创建块管理器BlockManager

BlockManager负责对Block的管理,只有在BlockManager的初始化方法initialize被调用后,它才是有效的。BlockManager作为存储系统的一部分,代码如下

val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)

6.创建广播管理器BroadcastManager

BroadcastManager用于将配置信息和序列化后的RDD、job以及ShuffleDependency等信息在本地存储。如果为了容灾,也会复制到其他节点上。创建BroadcastManager的代码如下:

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

BroadcastManager必须在其初始化方法initialize被调用后,才能生效。initialize方法实际利用反射生成广播工厂实例broadcastFactory(可以配置属性spark.broadcast.factory指定, 默认为org.apache.spark.broadcast.TorrentBroadcastFactory)。 BroadcastManager的广播方法newBroadcast 实际代理了工厂broadcastFactory的newBroadcast方法来生成广播对象。 unbroadcast方法实际代理了工厂broadcastFactory的unbroadcast方法生成非广播对象 BroadcastManager的initialize、 unbroadcast及newBroadcast方法代码如下

private def initalize() {
    synchronized{
        if (!initialized) {
            val broadcastFactoryClass = conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.TorrentBroadcastFactory")
            broadcastFactory =         
        Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
       broadcastFactory.initialize(isDriver, conf, securityManager)
initialized = true
}
}
}

private val nextBroadcastId = new AtomicLong(0) 

def newBroadcast[T: ClassTag] (value_ : T, isLocal: Boolean) = {
    broadcastFactory.newBroadcast[T] (value_, isLocal, nextBroadcastId.getAndIncrement()}
}

def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean){
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/632782.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号