Spark 的 RPC 概述
Spark 是一个 通用的分布式计算系统,既然是分布式的,必然存在很多节点之间的通信,那么
Spark不同组件之间就会通过 RPC
(
Remote Procedure Call
)进行点对点通信。以下所有都是基于spark2.4.X版本进行分析学习的,该版本也是当前CDH 稳定版本
spark中网络通信无处不在,例如
-driver
和
master
的通信,比如
driver
会向
master
发送
RegisterApplication
消息
- master
和
worker
的通信,比如
worker
会向
master
上报
worker
上运行
Executor
信息
- executor
和
driver
的的通信,
executor
运行在
worker
上,
spark
的
tasks
被分发到运行在各个
executor
中,
executor
需要通过向
driver
发送任务运行结果。
- worker
和
worker
的通信,
task
运行期间需要从其他地方
fetch
数据,这些数据是由运行在其他
worker上的executor
上的
task
产生,因此需要到
worker
上
fetch
数据
总结起来通信主要存在两个方面:
1.
汇集信息,例如
task
变化信息,
executor
状态变化信息。
2.
传输数据,
spark shuffle
(也就是
reduce
从上游
map
的输出中汇集输入数据)阶段存在大量的数据传输。
RPC版本迭代
-driver 和 master 的通信,比如 driver 会向 master 发送 RegisterApplication 消息 - master 和 worker 的通信,比如 worker 会向 master 上报 worker 上运行 Executor 信息 - executor 和 driver 的的通信, executor 运行在 worker 上, spark 的 tasks 被分发到运行在各个 executor 中, executor 需要通过向 driver 发送任务运行结果。 - worker 和 worker 的通信, task 运行期间需要从其他地方 fetch 数据,这些数据是由运行在其他 worker上的executor 上的 task 产生,因此需要到 worker 上 fetch 数据总结起来通信主要存在两个方面: 1. 汇集信息,例如 task 变化信息, executor 状态变化信息。 2. 传输数据, spark shuffle (也就是 reduce 从上游 map 的输出中汇集输入数据)阶段存在大量的数据传输。 RPC版本迭代
早期版本是akka 和netty 共同组成的,Netty 做大数据传输,Akka 做组件之前的内部通信,在Spark2.0 Akka 完全被放弃了,全部都是依赖于Netty ,包括组件之前的通信、用户上传配置文件、Jar包、数据传输等。
SparkRPC 的原理 Akka工作原理在介绍新版本rpc 组成之前,先看下Akka 的大致工作原理,方便我们后续进行对比比较
- ActorSystem 管理通信角色 actor 的一个系统概念,在一个服务器节点中,只要存在一个这样的对象就可以,这个对象的作用,就是用来生成和管理所有的通信角色的生命周期
- 通信角色 Actor,存在于一台服务器中的一个 ActorSystem 的内部,用来和其他节点的 actor 进行通信。每个 Actor 都有一个 MailBox,别的Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。
- Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
- 消息可以选择进行异步、同步、有反馈、无反馈的进行发送。
- RpcEnv 类似于 ActorSystem,服务端和客户端都可以使用它来做通信。
- RpcEndPoint 是一个可以相应请求的服务,类似于 Akka 中的 Actor 。其中有 receive 方法用来接收客户端发送过来的信息,也有 receiveAndReply 方法用来接收并应答,应答通过 RpcContext 回调。
- RpcEndpointRef 类似于 Akka 中的 ActorRef ,是 RpcEndPoint 的引用,持有远程 RpcEndPoint 的地址名称等,提供了 send 方法和 ask 方法用于发送请求。
-
RpcEnvFactory负责创建
-
负责RpcEndpoint的整个生命周期管理
-
注册或者销毁Endpoint
-
解析TCP层的数据包并反序列化对相应序列化后通过TPC传输到远端
-
路由请求到指定的Endpoint
核心方法
注册一个RPCEndpoint 到SparkRPC 中
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
RpcEndpoint生命周期
onStart 作为初始化方法,在接受消息之前进行预处理
onStop. 在消息处理完毕后进行收尾操作
receive()为单次接收不进行回复,对应的是 RpcEndpointRef.send
receiveAndReply 为接收消息,并进行相关回复,对应的是RpcEndpointRef.ask
Dispatcher 消息路由- 调用Inbox的post方法将消息放入message
- 将有消息的Inbox相关联的EndpointData放入receivers
- MessageLoop每次循环先从receivers中获取EndpointData
- 执行EndpointData中的Inbox的process方法对消息进行处理
由上图代码看出是先从EndpointData找出对应的Inbox,然后对消息进行处理。
InBox OutBox对消息进行缓冲 存储
OutboxMessage 存储在OutBox 中,InboxMessage 存储在Inbox 中
客户端发送RPC时序图注:本文源码均为2.4.x 版本 即当前CDH 稳定运行版本!后续无特殊说明均为该版本



