栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

(8)RDD分区器

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

(8)RDD分区器

RDD分区器

Spark目前支持Hash分区和Range分区,和用户自定义分区。

Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区,进而决定了Reduce的个数。

只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

每个RDD的分区ID范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

Hash分区:

对于给定的key,计算其hashCode,并除以分区个数取余。
如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

如源码所示:

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
  //key的hashcode % 分区个数 ,余数<0 用余数+分区的个数(否则加0)
  def nonNegativeMod(x: Int, mod: Int): Int = {
    val rawMod = x % mod
    rawMod + (if (rawMod < 0) mod else 0)
  }
Range分区:

HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据。

RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。

使用到了Reservoir sampling(水塘抽样)算法,简单的说就是将一定范围内的数映射到某一个分区内。实现过程为:

第一步:先从整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds(边界范围);

第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的

自定义分区器
object spark_mypartitioner {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("flatmap")
    val sc = new SparkContext(sparkConf)

    val rdd = sc.makeRDD(List(
      ("apple","xxx"),
      ("pig","xxx"),
      ("apple","xxx"),
      ("tree","xxx")
    ),2)

    val partitionRdd = rdd.partitionBy(new MyPartitioner())

    partitionRdd.saveAsTextFile("datas/MyPartitioner")
  }

}

class MyPartitioner extends Partitioner{
  //分区数量
  override def numPartitions: Int = 3

  //根据key  返回分区索引 从0开始
  override def getPartition(key: Any): Int = {
    key match {
      case "apple" => 0
      case "pig" => 1
      case _ => 2
    }

  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/749107.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号