栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark源码阅读03-Spark存储原理之共享变量

Spark源码阅读03-Spark存储原理之共享变量

共享变量

通常情况下, 当一个函数传递给远程集群节点上运行的Spark操作时(如Map、Reduce), 该函数中所有的变量都会在各节点中创建副本, 在各节点中的变量相互隔离并由所在节点的函数进行调用, 并且这些变量的更新都不会传递回Driver程序。 在任务间进行通用、 可读写的共亨变量是低效的, 然而Spark还是提供了两种类型的共享变昼:广播变量和累加器

广播变量

广播变量允许开发人员在每个节点缓存只读的变量, 而不足在任务之间传递这些变量。 例如,使用广播变量能够高效地在集群每个节点创建大数据集的副本。同时,Spark还使用高效的广播算法分发这些变量, 从而减少通信的开销。

Spark应用桯序作业的执行由一系列调度阶段构成,而这些调度阶段通过Shuffle进行分隔。 Spark能够在每个调度阶段自动广播任务所需通用的数据,这些数据在广播时需进行序列化缓存, 并在任务运行前需进行反序列化。 这就意味着当多个调度阶段的任务需要相同的数据, 显式地创建广播变量才有用。

可以通过调用SparkContext.broadcast(v)创建一个广播变量v,该广播变量封装在v变量中,可使用获取该变量value的方法进行访问。

代码实现如下:

scala> val broadcastVar = sc.broadcast (Array(1, 2, 3)) 
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(O) 

scala> broadcastVar.value 
res0: Array[Int] = Array(1,2,3)

当广播变量创建后, 在集群中所有函数将以变量v代表该广播变量, 并且该变量v 一次性分发到各节点上。另外,为了确保所有的节点获得相同的变量,对象v广播后只读不能够被修改。

累加器

累加器是Spark中仅有通过关联操作进行累加的变量, 因此能够有效地支持并行计算, 它们能够用于计数(如MapReduce)和求和。 Spark原生支持数值类型的累加器, 不过开发人员能够定义新的类型。如果在创建累加器时指定了名称, 可以通过Spark的UI监控界面中进行查看, 这种方式能够帮助理解作业所构成的调度阶段执行过程。

通过调用SparkContext.accumulator(v)方法初始化累加器变量V,在集群中的任务能够使用加法或者"+="操作符进行累加操作(在Scala和Python中)。然而, 它们不能在应用程序中读取这些值, 只能由Driver程序通过读方法获取这些累加器的值。

下面代码演示如何把一个数组的元素追加到累加器中:

scala> val accum = sc.accumulator (0, "My Accumulator") 
accum: spark.Accumulator[Int] = 0 

scala> Sc.parallelize(Array(l, 2, 3, 4)).foreach(x => accum += x) 
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s 

scala> accum.value 
res2: Int = 10 

尽管上面的例子使用Spark原生所支持的累加器Int类型, 但是开及人员能够通过继承AccumulatorParam类来创建自定义的累加类型。 AccumulatorParam接口提供了两个方法: zero 方法为自定义类型设置 ”0值 " 和addInPlace方法将两个变量进行求和。例如, 下面将对Vector 类所提供的向量vector进行求和, 代码如下:

object VectorAccumulatorParam extends AccumulatorParam[MyVector, MyVector] {
  
  def zsro(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
  ...
}
 
// 可以创建向量的累加器变量
val vecAccum = sc.accumulator(new Vetor(...))(VectorAccumulatorParam)

在Scala中, 尽管结果的类型和累加元素的数据类型可能存在不一致的情况,Spark提供更通用的接口来累加数据(例如, 通过创建一个列表来容纳累加的元素), 另外SparkContext. accumulableCollection提供了通用的方法来累加Scala集合类型。

累加器只能由Spark内部进行更新, 并保址每个任务在累加器的更新操作仅执行一次, 也就是说, 重启任务也不应该更新。 在转换操作中, 用户必须意识到任务和作业的调度过程重新执行会造成累加器多次更新。

累加器同样具有Spark懒加载的求值模型。 如果它们在RDD的操作中进行更新, 它们的值只在RDD进行行动操作时才进行更新。 因此,当执行如Map懒加载操作时, 累加器并没有立即更新。 以下代码片段演示该特性:

//此时accum的值仍然是o. 因为没有动作操作引起map的计算
val accum = sc.accumulator(O) 
data.map { 
	x => accum += x; f(x)
 } 
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/653645.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号