栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark-core 转换算子(九)

Spark-core 转换算子(九)

Transformations 算子详解 二
  上一篇,我们主要分析了一下简单的转换算子,这里我们先分析一下常见的转换算子。


1、groupBy算子

  groupBy算子如其名,分组算子。但是我们需要制定分组函数。它和groupByKey不同,groupByKey直接按照key分组。
源码部分:

  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
    groupBy[K](f, defaultPartitioner(this))
  }
  def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
      : RDD[(K, Iterable[T])] = withScope {
    val cleanF = sc.clean(f)
    this.map(t => (cleanF(t), t)).groupByKey(p)
  }

通过源码发现,groupBy的底层任仍然是groupByKey算子,经过map得到灭搁置的一个函数标签,类似于键值对,在按照groupByKey分分组得到RDD[(K, Iterable[T])]这样的结果类型。
用法:

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("BOKE")
    val sc = new SparkContext(conf)
    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6, 7, 8, 9))
    val value: RDD[(Boolean, Iterable[Int])] = rdd.groupBy((x: Int) => x % 2 == 0)
	    println(value.count())
	    println(value.max())
	    value.foreach((iter: (Boolean, Iterable[Int])) =>{
      	println(iter._2.toList.mkString(","))
    	})
  }

在上面的groupBy后,我们可以做一些分组统计,比如:max、min、count等等。
也可以直接输出进行下一步的操作。

2、groupByKey算子

  groupByKey类似于groupBy算子,就是分组,但是groupByKey是根据键值对的键值进行分组。返回值是RDD[String, Iterable[Int]]
源码:

  def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
    groupByKey(defaultPartitioner(self))
  }
  
  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergevalue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergevalue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

用法:

      val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("BOKE")
      val sc = new SparkContext(conf)
      val rdd: RDD[String] = sc.makeRDD(List("spark", "hello", "hadoop","hello"))
      rdd.map((_: String,1)).groupByKey().
        map((x: (String, Iterable[Int])) =>(x._1,x._2.sum))
        .foreach(println)

以上的实例就是wordCount的经典例子。
这里groupByKey就是一个简单的按照key分组的例子,里面不需要传参数,没有分区内分组聚合的功能,随着数据量的增加,shuffle过程中,可能会产生OOM。

3、reduceByKey算子
  reduceByKey算子是多键值的分组聚合算子,它和groupByKey算子最大的区别是可以传入聚合函数,支持分组内聚合,会减少shuffle的数量。
源码:

  
  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

  
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
 
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): 		    RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/674101.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号