1、map算子
源码:
1、首先会执行第一个map函数,创建MapPartitionsRDD,
2、然后内部调用第二个map函数,如一下map,很明显,A中的元素都被经过f作用后生成新的算子B
//Return a new RDD by applying a function to all elements of this RDD.
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)// 检查函数序列化,并执行闭包操作、
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}
def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
def hasNext = self.hasNext
def next() = f(self.next())
}
2、filter算子
filter是元素过滤算子,较为简单看下源码:
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(_, _, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}
def filter(p: A => Boolean): Iterator[A] = new AbstractIterator[A] {
// TODO 2.12 - Make a full-fledged FilterImpl that will reverse sense of p
private var hd: A = _
private var hdDefined: Boolean = false
def hasNext: Boolean = hdDefined || {
do {
if (!self.hasNext) return false
hd = self.next()
} while (!p(hd))
hdDefined = true
true
}
用法:
val rdd:RDD[Int] = sc.makeRDD(List(1,2,3,4))
// rdd中的每一个元素都会经过*2转换
rdd.filter(x=>x>=3).foreach(println)
3、flatMap算子
flatMap是扁平化算子,可以理解为是由两个算子组成,map和flattening。具体具体执行先执行map在执行flattening。这也是和map的主要区别,flatMap在map的基础上增加了扁平化而已。
源码:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}
def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] {
private var cur: Iterator[B] = empty
private def nextCur(): Unit = { cur = null ; cur = f(self.next()).toIterator }
def hasNext: Boolean = {
// Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext }
// but slightly shorter bytecode (better JVM inlining!)
while (!cur.hasNext) {
if (!self.hasNext) return false
nextCur()
}
true
}
def next(): B = (if (hasNext) cur else empty).next()
}
用法:
val rdd:RDD[String] = sc.makeRDD(List("aa,bb","cc,dd","ee","","ff"))
rdd.flatMap(x=> x.split(",")).foreach(println)
OutPut:
ee
ff
cc
dd
aa
bb
这个我引用一个看到的其他比较复杂的例子,大家好好看看。
val arr=sc.parallelize(Array(("A",1),("B",2),("C",3)))
arr.flatMap(x=>(x._1+x._2)).foreach(println)
//输出结果
A
1
B
2
C
3
A;B;C;D;B;D;C
B;D;A;E;D;C
A;B
val data=sc.parallelize(List("A;B;C;D;B;D;C", "B;D;A;E;D;C", "A;B"))
data.map(_.split(";")).flatMap(x=>{
for(i<-0 until x.length-1) yield (x(i)+","+x(i+1),1)
}).reduceByKey(_+_).foreach(println)
//输出结果
(A,E,1)
(E,D,1)
(D,A,1)
(C,D,1)
(B,C,1)
(B,D,2)
(D,C,2)
(D,B,1)
(A,B,2)
在第二个例子中,我们更容易理解扁平化操作的实现。



