Spark目前支持Hash分区和Range分区,和用户自定义分区。
Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。
只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。
Hash分区:对于给定的key,计算其hashCode,并除以分区个数取余。
如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。
如源码所示:
def getPartition(key: Any): Int = key match {
case null => 0
case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
}
//key的hashcode % 分区个数 ,余数<0 用余数+分区的个数(否则加0)
def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
Range分区:
HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。
RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。
使用到了Reservoir sampling(水塘抽样)算法,简单的说就是将一定范围内的数映射到某一个分区内。实现过程为:
第一步:先从整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds(边界范围);
第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的
自定义分区器object spark_mypartitioner {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("flatmap")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(
("apple","xxx"),
("pig","xxx"),
("apple","xxx"),
("tree","xxx")
),2)
val partitionRdd = rdd.partitionBy(new MyPartitioner())
partitionRdd.saveAsTextFile("datas/MyPartitioner")
}
}
class MyPartitioner extends Partitioner{
//分区数量
override def numPartitions: Int = 3
//根据key 返回分区索引 从0开始
override def getPartition(key: Any): Int = {
key match {
case "apple" => 0
case "pig" => 1
case _ => 2
}
}
}



