栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Spark 源码】1-Spark RPC

【Spark 源码】1-Spark RPC

Spark 的 RPC 概述         Spark 是一个 通用的分布式计算系统,既然是分布式的,必然存在很多节点之间的通信,那么 Spark不同组件之间就会通过 RPC ( Remote Procedure Call )进行点对点通信。以下所有都是基于spark2.4.X版本进行分析学习的,该版本也是当前CDH 稳定版本         spark中网络通信无处不在,例如 
-driver 和 master 的通信,比如 driver 会向 master 发送 RegisterApplication 消息 - master 和 worker 的通信,比如 worker 会向 master 上报 worker 上运行 Executor 信息 - executor 和 driver 的的通信, executor 运行在 worker 上, spark 的 tasks 被分发到运行在各个 executor 中, executor 需要通过向 driver 发送任务运行结果。 - worker 和 worker 的通信, task 运行期间需要从其他地方 fetch 数据,这些数据是由运行在其他 worker上的executor 上的 task 产生,因此需要到 worker 上 fetch 数据
总结起来通信主要存在两个方面: 1. 汇集信息,例如 task 变化信息, executor 状态变化信息。 2. 传输数据, spark shuffle (也就是 reduce 从上游 map 的输出中汇集输入数据)阶段存在大量的数据传输。 RPC版本迭代

        早期版本是akka 和netty 共同组成的,Netty 做大数据传输,Akka 做组件之前的内部通信,在Spark2.0 Akka 完全被放弃了,全部都是依赖于Netty ,包括组件之前的通信、用户上传配置文件、Jar包、数据传输等。

SparkRPC 的原理 Akka工作原理

        在介绍新版本rpc 组成之前,先看下Akka 的大致工作原理,方便我们后续进行对比比较

  •  ActorSystem 管理通信角色 actor 的一个系统概念,在一个服务器节点中,只要存在一个这样的对象就可以,这个对象的作用,就是用来生成和管理所有的通信角色的生命周期
  • 通信角色 Actor,存在于一台服务器中的一个 ActorSystem 的内部,用来和其他节点的 actor 进行通信。每个 Actor 都有一个 MailBox,别的Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。
  • Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
  • 消息可以选择进行异步、同步、有反馈、无反馈的进行发送。
Sprak Netty          spark 基于 netty 新的 rpc 框架借鉴了 Akka 的中的设计,它是基于 Actor模型,各个组件可以认为是一个 个独立的实体,各个实体之间通过消息来进行通信。           
  • RpcEnv 类似于 ActorSystem,服务端和客户端都可以使用它来做通信。
  • RpcEndPoint 是一个可以相应请求的服务,类似于 Akka 中的 Actor 。其中有 receive 方法用来接收客户端发送过来的信息,也有 receiveAndReply 方法用来接收并应答,应答通过 RpcContext 回调。
  • RpcEndpointRef  类似于 Akka 中的 ActorRef ,是 RpcEndPoint 的引用,持有远程 RpcEndPoint 的地址名称等,提供了 send 方法和 ask 方法用于发送请求。

Spark RPC 类图

 源码解析 RpcEnv 
  • RpcEnvFactory负责创建

  • 负责RpcEndpoint的整个生命周期管理

  • 注册或者销毁Endpoint

  • 解析TCP层的数据包并反序列化对相应序列化后通过TPC传输到远端

  • 路由请求到指定的Endpoint

 核心方法

注册一个RPCEndpoint 到SparkRPC 中

    def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
RpcEndpoint生命周期

onStart    作为初始化方法,在接受消息之前进行预处理

onStop.  在消息处理完毕后进行收尾操作 

 receive()为单次接收不进行回复,对应的是 RpcEndpointRef.send

 receiveAndReply 为接收消息,并进行相关回复,对应的是RpcEndpointRef.ask

 Dispatcher 消息路由
  • 调用Inbox的post方法将消息放入message
  • 将有消息的Inbox相关联的EndpointData放入receivers
  • MessageLoop每次循环先从receivers中获取EndpointData
  • 执行EndpointData中的Inbox的process方法对消息进行处理

 由上图代码看出是先从EndpointData找出对应的Inbox,然后对消息进行处理。

 InBox OutBox

对消息进行缓冲 存储

OutboxMessage 存储在OutBox 中,InboxMessage 存储在Inbox 中

客户端发送RPC时序图

 注:本文源码均为2.4.x 版本 即当前CDH 稳定运行版本!后续无特殊说明均为该版本

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/602102.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号