RDD方法=>RDD算子
转换算子(Transformation):功能的补充和封装,将旧的RDD包装成新的RDD(flatMap,map...)行动算子(Action):触发任务的调度和作业的执行(collect)
目录
1)map
2)mapPartitions
3) mapPartitionsWithIndex
1)map
def map[U: ClassTag](f: T => U): RDD[U]
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
// 1,2,3,4
// 2,4,6,8
// 转换函数
def mapFunction(num:Int): Int = {
num * 2
}
//回顾Scala至简原则
//val mapRDD: RDD[Int] = rdd.map(mapFunction)
//val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2})
//val mapRDD: RDD[Int] = rdd.map((num:Int)=>num*2)
//val mapRDD: RDD[Int] = rdd.map((num)=>num*2)
//val mapRDD: RDD[Int] = rdd.map(num=>num*2)
val mapRDD: RDD[Int] = rdd.map(_*2)
mapRDD.collect().foreach(println)
rdd的计算一个分区内的数据是一个一个执行逻辑 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。 分区内数据的执行是有序的;不同分区数据计算是无序的。
2)mapPartitions
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
(使用mapPartitions可以以分区为单位进行数据转换操作,但会将整个分区的数据加载到内存进行引用,如果处理完的数据是不会被释放掉,存在对象的引用,在内存较小,数据量较大场合下,容易内存溢出)
eg:获取每个数据分区的最大值
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
// 【2】,【4】
val mpRDD = rdd.mapPartitions(
iter => {
List(iter.max).iterator
}
)
map和mapPartitions区别:
数据处理角度。Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。功能的角度。Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据性能的角度。Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。
3)mapPartitionsWithIndex
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
eg:获取第二个数据分区的数据
val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
if ( index == 1) {
iter
} else {
Nil.iterator
}
}
eg.获取每个数据的分区
val mpiRDD = rdd.mapPartitionsWithIndex(
(index, iter) => {
// 1, 2, 3, 4
//(0,1)(2,2),(4,3),(6,4)
iter.map(
num => {
(index, num)
}
)
}
)
4)flatMap
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射,与map相比就是多了扁平化操作
eg.将List(List(1,2),3,List(4,5))进行扁平化操作
val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))
val flatRDD = rdd.flatMap(
data => {
//模式匹配
data match {
case list:List[_] => list
case dat => List(dat)
}
}
)



