1、groupBy算子
groupBy算子如其名,分组算子。但是我们需要制定分组函数。它和groupByKey不同,groupByKey直接按照key分组。
源码部分:
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
通过源码发现,groupBy的底层任仍然是groupByKey算子,经过map得到灭搁置的一个函数标签,类似于键值对,在按照groupByKey分分组得到RDD[(K, Iterable[T])]这样的结果类型。
用法:
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("BOKE")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
val value: RDD[(Boolean, Iterable[Int])] = rdd.groupBy((x: Int) => x % 2 == 0)
println(value.count())
println(value.max())
value.foreach((iter: (Boolean, Iterable[Int])) =>{
println(iter._2.toList.mkString(","))
})
}
在上面的groupBy后,我们可以做一些分组统计,比如:max、min、count等等。
也可以直接输出进行下一步的操作。
2、groupByKey算子
groupByKey类似于groupBy算子,就是分组,但是groupByKey是根据键值对的键值进行分组。返回值是RDD[String, Iterable[Int]]
源码:
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self))
}
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
val createCombiner = (v: V) => CompactBuffer(v)
val mergevalue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergevalue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
用法:
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("BOKE")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.makeRDD(List("spark", "hello", "hadoop","hello"))
rdd.map((_: String,1)).groupByKey().
map((x: (String, Iterable[Int])) =>(x._1,x._2.sum))
.foreach(println)
以上的实例就是wordCount的经典例子。
这里groupByKey就是一个简单的按照key分组的例子,里面不需要传参数,没有分区内分组聚合的功能,随着数据量的增加,shuffle过程中,可能会产生OOM。
3、reduceByKey算子
reduceByKey算子是多键值的分组聚合算子,它和groupByKey算子最大的区别是可以传入聚合函数,支持分组内聚合,会减少shuffle的数量。
源码:
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
}
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
reduceByKey(defaultPartitioner(self), func)
}
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}



