Spark中 RDD 为一个抽象类,是一个分布式数据集
包含五个特征
1. 一个分区的列表
2. 一个计算函数compute,对每个分区进行计算
3. 对其他RDDs的依赖(宽依赖(有shuffle)、窄依赖(无shuffle))列表
4. 对 key-value RDDs 来说,存在一个分区器( Partitioner )【可选的】 5. 对每个分区有一个优先位置的列表【可选的】 PairRDD key-value 类型的 RDD首先参考下 aggregateByKey 源码
翻译过来大体意思是:
按照每个key聚合value值,使用给定的聚合函数和中立的初始值。这个函数能够返回一个不同的结果类型U,而不是RDD的values值类型V, 因此,我们我们需要一个操作把V类型的值合并到U中,一个操作将两个U合并,就像scala的TraversableOnce。前一个操作用于合并分区内的值,后面的操作是分区间的合并,为了避免内存重新分配,这两个函数都可以修改和返回他们的第一个参数,而不是创建一个新的U类型的参数
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
该函数为柯里化方法,输入参数分为了两部分:(zeroValue: U)与(seqOp: (U, V) => U, combOp: (U, U) => U)
zeroValue:初始值 (U类型)
seqOp:每个分区内的累积结果 (结果为U类型)
combOp: 分区之间 (一个关联运算符用于组合不同分区的结果)(结果为U类型)
例子:
给定一组数据
val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))
键值对中的 key 表示图书名称, value 表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量 //实现步骤 1、按照图书汇总总的销量 、汇总天数 2、相除注:最后的结果都是由 初始值最后的结果操作得来
方法一:
1、scala> val rdd2 = rdd.aggregateByKey((0,0))((x,y)=>(x._1+y,x._2+1),(a,b)=>(a._1+b._1,a._2+b._2))
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[4] at aggregateByKey at
//其中zeroValue 初始值为 (0,0) 类型为 元组(U)
// seqOp 为 (x,y)=>(x._1+y,x._2+1) (x,y) 类型为 (元组(U),V(key,value中的value) )
// combOp为(a,b)=>(a._1+b._1,a._2+b._2)
scala> rdd2.collect
res7: Array[(String, (Int, Int))] = Array((spark,(91,5)), (hadoop,(65,3)), (scala,(50,2)))
//初始值最后的结果集
2、scala> rdd2.mapValues(x=>x._1.toDouble/x._2).collect
res8: Array[(String, Double)] = Array((spark,18.2), (hadoop,21.666666666666668), (scala,25.0))
res9: Array[(String, scala.collection.mutable.ArrayBuffer[Int])] = Array((spark,ArrayBuffer(15, 16, 12, 25, 23)), (hadoop,ArrayBuffer(23, 16, 26)), (scala,ArrayBuffer(26, 24)))
scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int] ())((x, y) => {x.append(y); x}, (a, b) => {a++b} ).mapValues(x=>x.sum.toDouble/x.size).collect
res10: Array[(String, Double)] = Array((spark,18.2), (hadoop,21.666666666666668), (scala,25.0))



