今天来讨论一下spark里面的闭包问题,当用户提交了一个用scala语言写的Spark程序,首先这个Spark程序就是一个“Application”,程序里面的mian函数就是“Driver Program”,dirver程序的可能运行在客户端,也有可有可能运行在spark集群中,这取决于spark作业提交时参数的选定,比如,yarn-client和yarn-cluster就是分别运行在客户端和spark集群中。在driver程序中会有RDD对象的相关代码操作,它们是在Worker节点上面运行的,所以spark会透明地帮用户把这些涉及到RDD操作的代码传给相应的worker节点。
如果在RDD map函数中调用了在函数外部定义的对象,因为这些对象需要通过网络从driver所在节点传给其他的worker节点,所以要求这些类是可序列化的,比如在Java或者scala中实现Serializable类,除了java这种序列化机制,还可以选择其他方式,使得序列化工作更加高效。
使用场景worker节点接收到程序之后,在spark资源管理器的指挥下运行RDD程序。
在worker节点上所运行的RDD中代码的变量是保存在worker节点上面的,在spark编程中,很多时候用户需要在driver程序中进行相关数据操作之后把该数据传给RDD对象的方法以做进一步处理,这时候,spark框架会自动帮用户把这些数据通过网络传给相应的worker节点。
除了这种以变量的形式定义传输数据到worker节点之外,Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。其中累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。
这是因为相比于变量的方式,在一定场景下使用broadcast比较有优势,因为所广播的数据在每一个worker节点上面只存一个副本,而在spark算子中使用到的外部变量会在每一个用到它的task中保存一个副本,即使这些task在同一个节点上面。
所以当数据量比较大的时候,建议使用广播而不是外部变量。
使用过程(1) 通过对一个类型 T 的对象调用 SparkContext.broadcast 创建出一个 Broadcast[T] 对象。任何可序列化的类型都可以这么实现。
(2) 通过 value 属性访问该对象的值(在 Java 中为 value() 方法)。
(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。
val listBC = spark.sparkContext.broadcast(list) //广播 listBC.value //读取
举个例子:
list是在driver端创建的,但是因为需要在excutor端使用,所以driver会把list以task的形式发送到excutor端,如果有很多个task,就会有很多给excutor端携带很多个list,如果这个list非常大的时候,就可能会造成内存溢出(如下图所示)。这个时候就引出了广播变量。
1.不能将一个RDD使用广播变量广播出去,因为RDD是不存储数据的。可以将RDD的结果广播出去。
2.广播变量只能在Driver端定义,不能在Executor端定义。
3.在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。



