栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark aggregateByKey函数使用

Spark aggregateByKey函数使用

Spark中 RDD 为一个抽象类,是一个分布式数据集

包含五个特征

1. 一个分区的列表

2. 一个计算函数compute,对每个分区进行计算

3. 对其他RDDs的依赖(宽依赖(有shuffle)、窄依赖(无shuffle))列表

4. 对 key-value RDDs 来说,存在一个分区器( Partitioner )【可选的】 5. 对每个分区有一个优先位置的列表【可选的】 PairRDD  key-value 类型的 RDD

首先参考下 aggregateByKey 源码

翻译过来大体意思是:

按照每个key聚合value值,使用给定的聚合函数和中立的初始值。这个函数能够返回一个不同的结果类型U,而不是RDD的values值类型V, 因此,我们我们需要一个操作把V类型的值合并到U中,一个操作将两个U合并,就像scala的TraversableOnce。前一个操作用于合并分区内的值,后面的操作是分区间的合并,为了避免内存重新分配,这两个函数都可以修改和返回他们的第一个参数,而不是创建一个新的U类型的参数

def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]

该函数为柯里化方法,输入参数分为了两部分:(zeroValue: U)与(seqOp: (U, V) => U, combOp: (U, U) => U)

zeroValue:初始值  (U类型)

seqOp:每个分区内的累积结果 (结果为U类型)

combOp: 分区之间 (一个关联运算符用于组合不同分区的结果)(结果为U类型)

例子:

给定一组数据

val rdd = sc.makeRDD(Array(("spark", 12), ("hadoop", 26), ("hadoop", 23), ("spark", 15), ("scala", 26), ("spark", 25), ("spark", 23), ("hadoop", 16), ("scala", 24), ("spark", 16)))

键值对中的 key 表示图书名称, value 表示某天图书销量。计算每个键对应的平均值,也就是计算每种图书的每天平均销量 //实现步骤  1、按照图书汇总总的销量  、汇总天数  2、相除   

注:最后的结果都是由 初始值最后的结果操作得来

方法一:

1、scala> val rdd2 = rdd.aggregateByKey((0,0))((x,y)=>(x._1+y,x._2+1),(a,b)=>(a._1+b._1,a._2+b._2))
rdd2: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[4] at aggregateByKey at :25

//其中zeroValue 初始值为 (0,0)        类型为 元组(U)

// seqOp 为 (x,y)=>(x._1+y,x._2+1)  (x,y) 类型为 (元组(U),V(key,value中的value) )

// combOp为(a,b)=>(a._1+b._1,a._2+b._2)

scala> rdd2.collect
res7: Array[(String, (Int, Int))] = Array((spark,(91,5)), (hadoop,(65,3)), (scala,(50,2)))

//初始值最后的结果集

2、scala> rdd2.mapValues(x=>x._1.toDouble/x._2).collect
res8: Array[(String, Double)] = Array((spark,18.2), (hadoop,21.666666666666668), (scala,25.0))

//初始值最后的结果集第一个值/第二个值得到最终结果集 方法二: // scala.collection.mutable.ArrayBuffer[Int] () 定义一个初始值 scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int] ())((x, y) => {x.append(y); x}, (a, b) => {a++b} ).collect
res9: Array[(String, scala.collection.mutable.ArrayBuffer[Int])] = Array((spark,ArrayBuffer(15, 16, 12, 25, 23)), (hadoop,ArrayBuffer(23, 16, 26)), (scala,ArrayBuffer(26, 24)))
scala> rdd.aggregateByKey(scala.collection.mutable.ArrayBuffer[Int] ())((x, y) => {x.append(y); x}, (a, b) => {a++b} ).mapValues(x=>x.sum.toDouble/x.size).collect
res10: Array[(String, Double)] = Array((spark,18.2), (hadoop,21.666666666666668), (scala,25.0))
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/736034.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号