栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践: Spark(三) 代码分析

山东大学软件工程应用与实践: Spark(三) 代码分析

2021SC@SDUSC


目录

SparkEnv

1.安全管理器SecurityManager

2.基于Akka的分布式消息系统ActorSystem

3.map任务输出跟踪器mapOutputTracker


​​​​​​​

SparkEnv

SparkEnv的方法createDriverEnv最终调用create创建SparkEnv。SparkEnv的构造步骤如下:

  1. 创建安全管理器SecurityManager;
  2. 创建基于Akka的分布式消息系统ActorSystem;
  3. 创建Map任务输出跟踪器mapOutputTracker;
  4. 实例化ShuffleManager;
  5. 创建ShuffleMemoryManager;
  6. 创建块传输服务BlockTransferService;
  7. 创建BlockManagerMaster;
  8. 创建块管理器BlockManager;
  9. 创建广播管理器BroadcastManager;
  10. 创建缓存管理器CacheManager;
  11. 创建HTTP文件服务器HttpFileServer;
  12. 创建测量系统MetricsSystem;
  13. 创建SparkEnv

1.安全管理器SecurityManager

SecurityManager主要对权限、账号进行设置,如果使用Hadoop YARN作为集群管理器,则需要使用证书生成secret key登录,最后给当前系统设置默认的口令认证实例,此实例采用匿名内部类实现。代码如下

private val secretKey = generateSecretKey()

//使用HTTP连接设置口令认证 
if (authOn) { 
    Authenticator.setDefault(
            new Authenticator(){
                override def getPasswordAuthentication() : PasswordAuthentication = {
                    var passAuth: PasswordAuthentication - null
                val userInfo = getRequestingURL().getUserInfo()
                if (userInfo != null){
                    val parts = userInfo.split(":", 2)
                    passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
                    }
                    return passAuth
                }
            }
    )
}

2.基于Akka的分布式消息系统ActorSystem

ActorSystem是Spark中最基础的设施, Spark既使用它发送分布式消息, 又用它实现并发编程。 在Scala中只需要自定义类型继承Actor, 并且提供act方法, 就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法, 而是通过发送消息的方式(Scala发送消息是异步的)传递数据。 如:

Actor ! message

Akka是Actor编程模型的高级类库, 类似千JDK 1.5之后越来越丰富的并发工具包, 简化了程序员并发编程的难度。 ActorSystem便是Akka提供的川千创建分布式消息通信系统的基础类。 
 

SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,代码如下。AkkaUtils.createActorSystem方法用于启动ActorSystem。AkkaUtils使用了Utils的静态方法startServiceOnPort,startServiceOnPort最终会回调方法startService:Int => (T, Int),此外的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的。Spark的Driver中Akka的默认访问地址是akka://sparkDriver,Spark的Executor中Akka的默认访问地址是akka://sparkExecutor。如果不指定ActorSystem的端口,那么所有节点的ActorSystem端口在每次启动时随机产生。

val (actorSystem, boundPort) = 
    Option(defaultActorSystem) match {
        case Some(as) => (as, port)
        case None =>
            val actorSystemName = if (isDriver) driverActorSystemName else
                executorActorSystemName
            AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf,             
                securityManager)
}
def createActorSystem(
        name: String,
        host: String,
        port: Int,
        conf: SparkConf
        securityManager: SecurityManager): (ActorSystem, Int) = {
    val startService: Int => (ActorSystem, Int) = { actualPort =>
        doCreateActorSystem(name, host, actualPort, conf, securityManager)
    }
    Utils.startServiceonPort(port, startService, conf, name)
}

3.map任务输出跟踪器mapOutputTracker

mapOutputTracker用于跟踪map阶段任务的输出状态,此状态便于reduce阶段任务获取地址及中间输出结果。每个map任务或者reduce任务都会有其唯一标识,分别为mapId和reduceId。每个reduce任务的输入可能是多个map任务的输出,reduce会到每个map任务的所有节点上拉取Block,这一过程叫shuffle。每批shuffle过程都有唯一的标识shuffleId。

首先是MapOutputTrackerMaster。MapOutputTrackerMaster内部使用mapStatuses:TimeStampedHashMap[Int, Array[MapStatus]]来维护跟踪各个map任务的输出状态。其中key对应shuffleId,Array存储各个map任务对应的状态信息MapStatus。由于MapStatus维护了map输出Block的地址BlockManagerId,所以reduce任务知道从何处获取map任务的中间输出。MapOutputTrackerMaster还使用cachedSerializedStatuses: TimeStampedHashMap[lnt,  Array[Byte]]维护序列化后的各个map任务的输出状态。 其中key对应shuffleld, Array存储各个序列化MapStatus生成的字节数组。

无论是Driver还是Executor, 最后都由mapOutputTracker的属性trackerActor有MapOutputTrackerMasterActor的引用,代码如下

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
    if (isDriver) {
        logInfo("Registering " + name)
        actorSystem.actorOf(Pros(newActor), name = name)
    } else {
        AkkaUtils.makeDriverRef(name, conf, actorSystem)
    }
    }
    
    val mapOutputTracker = if (isDriver) {
        new MapOutputTrackerMaster(conf)
    } else {
         new MapOutputTrackerWorker(conf)
}

    mapOutputTracker.trackerActor = registerOrLookup(
        "MapOutputTracker",
    new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf(MapOutputTrackerMaster], conf))

registerOrLookup方法通过调用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,实际正是利用ActorSystem提供的分布式消息机制实现的。也体会到了Akka的便捷。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629453.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号