- 通信组件
- RpcEndpoint,RpcEnv,RpcEndpointRef,RpcAddress
- TransportServer,TransportClient,Outbox,Inbox,Dispatcher
- Driver端
- Executor端
为了了解spark的通信环境,我们需要了解它的主要组件。
RpcEndpoint,RpcEnv,RpcEndpointRef,RpcAddress首先是RpcEndpoint:
一个通信终端有他自己的生命周期:
constructor -> onStart -> receive* -> onStop
这里有几个问题,第一个,他是干什么的?
他主要是来收消息的。
如果是用来收消息的,必然会和Inbox有关,我们先放着。
第二个问题,他持有什么对象?
一个RpcEndpoint需要将自己注册到RpcEnv中。
那么,什么是RpcEnv呢?
RpcEnv会处理从RpcEndpointRef中发来的消息,并且将递给其相应的RpcEndpoint。
所以我们知道了,消息是从RpcEndpointRef那里过来的,所以这个东西应该要有一个发件箱来发送消息。
RpcEndpointRef,其实就是远程RpcEndpoint的一个引用。
通过它的方法可以判断他就是用来发送消息的。
所以,究竟是如何向远程发送消息的?
假设我现在要向远程的RpcEndpoint发送消息,我就必须要拿到它的引用,即RpcEndpointRef,通过该引用就能发消息了。
在RpcEnv与RpcEndpointRef中,都持有了一个对象,他就是RpcAddress:
所以这个RpcAddress其实就是一个主机+端口的地址。
如果要通信,肯定要有服务端和客户端。
我们的服务端叫做TransportServer:
客户端叫做TransportClient:
而且我们猜到,这个TransportClient一定与Outbox有关:
在发件箱里,正好有这个客户端,通过它可以将消息发到远程的服务端。
我们看到了Outbox,当然,我们也想看看Inbox:
收件箱是与RpcEndpoint相对应的。
接下来的问题就是,谁持有了Outbox以及Inbox。
一个叫DedicatedMessageLoop的组件持有了收件箱。
并且,它里面还有一个线程池,这个线程池的每个线程都在跑一个任务:
这个任务叫receiveLoop。
他先会从active里面取收件箱,这个active是阻塞队列:
// List of inboxes with pending messages, to be processed by the message loop. private val active = new linkedBlockingQueue[Inbox]()
初始化的时候肯定没有Inbox,所以会阻塞在这里。
我们再注意一下,DedicatedMessageLoop还持有了Dispatcher(从主构造方法可以看到),我们等等看该组件。
那么谁持有了Outbox呢?
它就是NettyRpcEnv。
NettyRpcEnv继承了RpcEnv,我们可以这么理解,spark通信框架的底层就是使用了netty。
从注释我们可以看到,一个RcpAddress对应着一个Outbox,相当于是取到远程地址就可以利用发件箱发送消息了。
最后,我们注意看Dispatcher:
这个Dispatcher是用来路由消息的。
endpoints是以RpcEndpoint名称为key,以MessageLoop为value的Map。
endpointRefs是以RpcEndpoint为key,以RpcEndpointRef为value的Map。
通过这两个map我们也可以猜到他是server和client中间的一个角色。
Driver端从Driver的通信环境搭建看起。
从SparkContext进入:
创建spark环境。
一路create,直到:
这个create方法是driver和executor共用的,也就是说,等下executor创建环境的话也要走这里。
接着创建。
走进创建NettyRpcEnv那里。
注意到里面初始化了Dispatcher和outboxes。
创建好NettyRpcEnv之后就要启动server了。
它第一步创建server,第二步注册RpcEndpoint。
先走进创建server:
就是new了一个TransportServer。
然后是注册RpcEndpoint:
它首先拿到RpcEndpoint的地址,然后根据这个地址new出一个RpcEndpointRef,这个RpcEndpointRef就是给executor用的。
然后将RpcEndpoint和其对应的RpcEndpointRef放进一个ConcurrentHashMap中,就是endpointRefs。
因为Dispatcher是一个中间者,他是来调度消息的。
接着他问我们的endpoint是不是IsolatedRpcEndpoint。
不管是driver端的,还是executor端的,它们的端点都是IsolatedRpcEndpoint。
所以,就像之前说的,就会阻塞等消息。
Executor端executor同样要进入创建rpc环境的代码,和driver是一样的,只不过不会像driver一样去startServer。
executor准备好rpc环境后(和driver一样,也会有一个TransportServer,TransportClient,RpcEndpoint,RpcEndpointRef,Inbox,Outbox),它会去和driver取得联系:
点下去:
他在这里创建了两个RpcEndpointRef,其中的verifier是去询问对方有没有对应的RpcEndpoint,找到了的话就成功返回RpcEndpointRef。



