- 一,测试程序
- 二,groupBy分区数如何确定
- 三,groupBy与groupByKey的关系
- 四,结论
spark中group转换操作会将数据分为一到几个组,分组的数量与分区数量是否有关系?group与groupBy有什么关系?
一,测试程序object WordCount extends Logging{
def main(args: Array[String]): Unit = {
log.info("----------------")
val sparkConnf=new SparkConf().setAppName("wordCount").setMaster("local[3]")
val sparkContext=new SparkContext(sparkConnf)
val rdd = sparkContext.parallelize(Array(1, 2, 3, 4, 5), 1)
log.info("=======rdd partitions size:"+rdd.partitions.length)
//按奇偶数分为2组
val groupRdd = rdd.groupBy(_ % 2)
log.info("=======groupRdd partitions size:"+rdd.partitions.length)
groupRdd.collect().map(x=>log.info("======="+x._1+":"+x._2))
sparkContext.stop()
}
}
执行结果如下,分组数是2,分区数是1。
接下来查看源码了解其中原理
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
Some(hasPartitioner.maxBy(_.partitions.length))
} else {
None
}
val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
rdd.context.defaultParallelism
} else {
rdds.map(_.partitions.length).max
}
// If the existing max partitioner is an eligible one, or its partitions number is larger
// than or equal to the default number of partitions, use the existing partitioner.
if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
defaultNumPartitions <= hasMaxPartitioner.get.getNumPartitions)) {
hasMaxPartitioner.get.partitioner.get
} else {
new HashPartitioner(defaultNumPartitions)
}
查看defaultPartitioner(rdd: RDD[], others: RDD[]*) 函数源码可以得出如下信息:
默认情况下会创建一个类型为HashPartitioner的分区器,分区数量为驱动程序中SparkContext对象spark.default.parallelism参数指定的值,如果该值为空,则为父RDD的最大分区数。
继续跟踪 groupBy[K](f, defaultPartitioner(this))调用
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
: RDD[(K, Iterable[T])] = withScope {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
this.map(t => (cleanF(t), t)) 创建了一个类型为MapPartitionsRDD的子RDD,并将RDD的数据类型转变为元组(_ , _)类型,第一个元素为groupBy操作中指定的函数转换后的值,第二个元素为原来的值,然后调用groupByKey*方法。
这里有一点需要注意,groupByKey是PairRDDFunctions的类方法,MapPartitionsRDD并不是PairRDDFunctions类子类,它是如何调用groupByKey方法的?答案是MapPartitionsRDD的父类RDD类的半生对象中定义了隐式转换函数rddToPairRDDFunctions.
1,groupBy操作结果中的分组数量与rdd的分区数量毫无关系。
2,groupBy底层调用了groupByKey。



