栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark源码之通信环境

Spark源码之通信环境

Spark通信
      • 通信组件
        • RpcEndpoint,RpcEnv,RpcEndpointRef,RpcAddress
        • TransportServer,TransportClient,Outbox,Inbox,Dispatcher
      • Driver端
      • Executor端

通信组件

为了了解spark的通信环境,我们需要了解它的主要组件。

RpcEndpoint,RpcEnv,RpcEndpointRef,RpcAddress

首先是RpcEndpoint:


一个通信终端有他自己的生命周期:

constructor -> onStart -> receive* -> onStop

这里有几个问题,第一个,他是干什么的?


他主要是来收消息的。

如果是用来收消息的,必然会和Inbox有关,我们先放着。

第二个问题,他持有什么对象?


一个RpcEndpoint需要将自己注册到RpcEnv中。

那么,什么是RpcEnv呢?


RpcEnv会处理从RpcEndpointRef中发来的消息,并且将递给其相应的RpcEndpoint。

所以我们知道了,消息是从RpcEndpointRef那里过来的,所以这个东西应该要有一个发件箱来发送消息。


RpcEndpointRef,其实就是远程RpcEndpoint的一个引用。


通过它的方法可以判断他就是用来发送消息的。

所以,究竟是如何向远程发送消息的?

假设我现在要向远程的RpcEndpoint发送消息,我就必须要拿到它的引用,即RpcEndpointRef,通过该引用就能发消息了。

在RpcEnv与RpcEndpointRef中,都持有了一个对象,他就是RpcAddress:


所以这个RpcAddress其实就是一个主机+端口的地址。

TransportServer,TransportClient,Outbox,Inbox,Dispatcher

如果要通信,肯定要有服务端和客户端。

我们的服务端叫做TransportServer:


客户端叫做TransportClient:


而且我们猜到,这个TransportClient一定与Outbox有关:

在发件箱里,正好有这个客户端,通过它可以将消息发到远程的服务端。

我们看到了Outbox,当然,我们也想看看Inbox:


收件箱是与RpcEndpoint相对应的。

接下来的问题就是,谁持有了Outbox以及Inbox。


一个叫DedicatedMessageLoop的组件持有了收件箱。

并且,它里面还有一个线程池,这个线程池的每个线程都在跑一个任务:


这个任务叫receiveLoop。

他先会从active里面取收件箱,这个active是阻塞队列:

  // List of inboxes with pending messages, to be processed by the message loop.
  private val active = new linkedBlockingQueue[Inbox]()

初始化的时候肯定没有Inbox,所以会阻塞在这里。

我们再注意一下,DedicatedMessageLoop还持有了Dispatcher(从主构造方法可以看到),我们等等看该组件。

那么谁持有了Outbox呢?


它就是NettyRpcEnv。

NettyRpcEnv继承了RpcEnv,我们可以这么理解,spark通信框架的底层就是使用了netty。

从注释我们可以看到,一个RcpAddress对应着一个Outbox,相当于是取到远程地址就可以利用发件箱发送消息了。

最后,我们注意看Dispatcher:


这个Dispatcher是用来路由消息的。

endpoints是以RpcEndpoint名称为key,以MessageLoop为value的Map。

endpointRefs是以RpcEndpoint为key,以RpcEndpointRef为value的Map。

通过这两个map我们也可以猜到他是server和client中间的一个角色。

Driver端

从Driver的通信环境搭建看起。

从SparkContext进入:

创建spark环境。

一路create,直到:


这个create方法是driver和executor共用的,也就是说,等下executor创建环境的话也要走这里。


接着创建。


走进创建NettyRpcEnv那里。


注意到里面初始化了Dispatcher和outboxes。

创建好NettyRpcEnv之后就要启动server了。

它第一步创建server,第二步注册RpcEndpoint。

先走进创建server:

就是new了一个TransportServer。

然后是注册RpcEndpoint:

它首先拿到RpcEndpoint的地址,然后根据这个地址new出一个RpcEndpointRef,这个RpcEndpointRef就是给executor用的。

然后将RpcEndpoint和其对应的RpcEndpointRef放进一个ConcurrentHashMap中,就是endpointRefs。

因为Dispatcher是一个中间者,他是来调度消息的。


接着他问我们的endpoint是不是IsolatedRpcEndpoint。

不管是driver端的,还是executor端的,它们的端点都是IsolatedRpcEndpoint。

所以,就像之前说的,就会阻塞等消息。

Executor端

executor同样要进入创建rpc环境的代码,和driver是一样的,只不过不会像driver一样去startServer。

executor准备好rpc环境后(和driver一样,也会有一个TransportServer,TransportClient,RpcEndpoint,RpcEndpointRef,Inbox,Outbox),它会去和driver取得联系:


点下去:



他在这里创建了两个RpcEndpointRef,其中的verifier是去询问对方有没有对应的RpcEndpoint,找到了的话就成功返回RpcEndpointRef。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/329810.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号