栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark-core 转换算子(八)

Spark-core 转换算子(八)

Transformations算子详解 一
  上一篇我们宏观的分析了Transformations算子和Action算子,现在我们开始兄原理到源码逐个详细的分析算子。

1、map算子
源码:
1、首先会执行第一个map函数,创建MapPartitionsRDD,
2、然后内部调用第二个map函数,如一下map,很明显,A中的元素都被经过f作用后生成新的算子B

//Return a new RDD by applying a function to all elements of this RDD.
  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)// 检查函数序列化,并执行闭包操作、
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
  }
  
  
  def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
  }

2、filter算子
  filter是元素过滤算子,较为简单看下源码:

  
  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (_, _, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }
  
  def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] {
    // TODO 2.12 - Make a full-fledged FilterImpl that will reverse sense of p
    private var hd: A = _
    private var hdDefined: Boolean = false

    def hasNext: Boolean = hdDefined || {
      do {
        if (!self.hasNext) return false
        hd = self.next()
      } while (!p(hd))
      hdDefined = true
      true
    }

用法:

    val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // rdd中的每一个元素都会经过*2转换
    rdd.filter(x=>x>=3).foreach(println)

3、flatMap算子
  flatMap是扁平化算子,可以理解为是由两个算子组成,map和flattening。具体具体执行先执行map在执行flattening。这也是和map的主要区别,flatMap在map的基础上增加了扁平化而已。
源码:

  
  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
  }
  
  def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
    private var cur: Iterator[B] = empty
    private def nextCur(): Unit = { cur = null ; cur = f(self.next()).toIterator }
    def hasNext: Boolean = {
      // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
      // but slightly shorter bytecode (better JVM inlining!)
      while (!cur.hasNext) {
        if (!self.hasNext) return false
        nextCur()
      }
      true
    }
    def next(): B = (if (hasNext) cur else empty).next()
  }

用法:

    val rdd:RDD[String] = sc.makeRDD(List("aa,bb","cc,dd","ee","","ff"))
    rdd.flatMap(x=> x.split(",")).foreach(println)
    OutPut:
		ee
		ff
		
		cc
		dd
		aa
		bb

这个我引用一个看到的其他比较复杂的例子,大家好好看看。

    val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
    arr.flatMap(x=>(x._1+x._2)).foreach(println)
    //输出结果
		A
		1
		B
		2
		C
		3
		A;B;C;D;B;D;C
		B;D;A;E;D;C
		A;B
    val data=sc.parallelize(List("A;B;C;D;B;D;C", "B;D;A;E;D;C", "A;B"))
    data.map(_.split(";")).flatMap(x=>{
      for(i<-0 until x.length-1) yield (x(i)+","+x(i+1),1)
    	}).reduceByKey(_+_).foreach(println)
      //输出结果
    	(A,E,1)
		(E,D,1)
		(D,A,1)
		(C,D,1)
		(B,C,1)
		(B,D,2)
		(D,C,2)
		(D,B,1)
		(A,B,2)

在第二个例子中,我们更容易理解扁平化操作的实现。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651457.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号