栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Spark RDD算子

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Spark RDD算子

RDD方法=>RDD算子

转换算子(Transformation):功能的补充和封装,将旧的RDD包装成新的RDD(flatMap,map...)行动算子(Action):触发任务的调度和作业的执行(collect)

目录

1)map

2)mapPartitions

3) mapPartitionsWithIndex


1)map

def map[U: ClassTag](f: T => U): RDD[U] 

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
val rdd = sc.makeRDD(List(1,2,3,4))
        // 1,2,3,4
        // 2,4,6,8

        // 转换函数
def mapFunction(num:Int): Int = {
            num * 2
        }
        //回顾Scala至简原则
        //val mapRDD: RDD[Int] = rdd.map(mapFunction)
        //val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2})
        //val mapRDD: RDD[Int] = rdd.map((num:Int)=>num*2)
        //val mapRDD: RDD[Int] = rdd.map((num)=>num*2)
        //val mapRDD: RDD[Int] = rdd.map(num=>num*2)
val mapRDD: RDD[Int] = rdd.map(_*2)
mapRDD.collect().foreach(println)

rdd的计算一个分区内的数据是一个一个执行逻辑 只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。 分区内数据的执行是有序的;不同分区数据计算是无序的。 

2)mapPartitions

def mapPartitions[U: ClassTag](  
        f: Iterator[T] => Iterator[U],  
        preservesPartitioning: Boolean = false): RDD[U] 


(使用mapPartitions可以以分区为单位进行数据转换操作,但会将整个分区的数据加载到内存进行引用,如果处理完的数据是不会被释放掉,存在对象的引用,在内存较小,数据量较大场合下,容易内存溢出)

eg:获取每个数据分区的最大值 

val rdd = sc.makeRDD(List(1,2,3,4), 2)

// 【1,2】,【3,4】
// 【2】,【4】
val mpRDD = rdd.mapPartitions(
     iter => {
        List(iter.max).iterator
              }
        )

map和mapPartitions区别:

数据处理角度。Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。功能的角度。Map算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据性能的角度。Map算子因为类似于串行操作,所以性能比较低,而是mapPartitions算子类似于批处理,所以性能较高。但是mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用map操作。 

3)mapPartitionsWithIndex

def mapPartitionsWithIndex[U: ClassTag](

        f: (Int, Iterator[T]) => Iterator[U],  

        preservesPartitioning: Boolean = false): RDD[U] 

eg:获取第二个数据分区的数据 

val rdd = sc.makeRDD(List(1,2,3,4), 2)
// 【1,2】,【3,4】
val mpiRDD = rdd.mapPartitionsWithIndex(
    (index, iter) => {
          if ( index == 1) {
                iter
                   } else {
                Nil.iterator
                   }
            }

eg.获取每个数据的分区

val mpiRDD = rdd.mapPartitionsWithIndex(
   (index, iter) => {
    // 1,   2,    3,   4
    //(0,1)(2,2),(4,3),(6,4)
         iter.map(
             num => {
                  (index, num)
                    }
                )
            }
        )
4)flatMap

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] 

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射,与map相比就是多了扁平化操作

eg.将List(List(1,2),3,List(4,5))进行扁平化操作 

val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))

val flatRDD = rdd.flatMap(
    data => {
    //模式匹配
          data match {
              case list:List[_] => list
              case dat => List(dat)
                }
            }
        )

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/733057.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号