栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark学习之路二——Spark 的核心之 RDD

Spark学习之路二——Spark 的核心之 RDD

Spark学习之路二——Spark 的核心之 RDD基础
一. 概述 1.1 什么是 RDD

RDD(Resilient Distributed Dataset)—— 弹性分布式数据集。

  1. RDD 是 Spark 中的抽象数据结构类型,Spark 中最基本的数据抽象,实现了以操作本地集合的方式来操作分布式数据集的抽象实现。

  2. 它代表一个不可变、可分区、里面的元素可并行计算的集合。

  3. RDD 具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。

  4. RDD 允许用户在执行多个查询时显示地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

  5. RDD 是 Spark 最核心的东西,RDD 必须是可序列化的。RDD 可以 cache 到内存中,省去了 MapReduce 大量的磁盘 IO 操作。

  6. 任何数据在 Spark 中都被表示为 RDD。从编程的角度来看,RDD 可以简单看成是一个数组。和普通数组的区别是,RDD 中的数据是分区存储在不同的机器上,同时可以被并行处理。

  7. 作用:Spark 应用程序所作的无非是把需要处理的数据转换为RDD,然后对 RDD 进行一系列的变换和操作从而得到结果。

1.2 Spark 编程模型

  1. RDD 被表示为对象。
  2. 通过对象上的方法调用来对 RDD 进行转换。
  3. 最后输出结果或是向存储系统保存数据。
  4. RDD 转换算子被称为 Transformation。
  5. 只有遇到 Action算子,才会执行 RDD的计算(懒执行)。
1.2.1 DataSource
  1. 定义:spark的数据来源
  2. 分类
    • DB(数据库)
    • File System(文件系统)
    • Socket(传输)
    • HDFS、Hbase … …
1.2.2 SparkContext
  1. RDD是一个对象。
  2. 是 Spark 的第一个类的入口,负责集群的交互。
  3. 用于连接 Spark 集群,创建 RDD,累加器,广播变量 …
  4. Method:Transformation(转换)、Action(动作)
  5. Spark 的物理模型

  • Driver:主要是对 SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建 Spark 应用程序的运行环境,在初始化SparkContext,要先导入一些 Spark 的类和隐式转换;在Executor部分运行完毕后,需要将SparkContext关闭。在Executor中完成数据的处理,数据有以下几种:

    • Scala集合数据(测试)
    • 文件系统、DB(SQL,NOSQL)的数据
    • RDD
    • 网络
  • RAM:随机存取存储器(内存)

    如果 Spark 集群是服务器则Driver发送tasks给工作节点,worker返回结果给driver。

1.2.3 Driver

  • SparkConf

    是 Spark 配置类,配置项包括:

    • master
    • appName
    • Jars
    • ExecutorEnv
  • SparkEnv

    • 利用Rpc协议 ——> 心跳机制,传输数据
    • 维护 Spark 的执行环境,有:
      • serializer
      • RpcEnv
      • Block Manager
      • 内存管理等
  • DAGScheduler

    • 高层调度器
    • 将Job按照RDD的依赖关系划分成若干了TaskSet(任务集),也称为Stage(阶段、时期);再结合当前缓存情况及数据就近原则,将Stage提交给TaskScheduller。

  • Task Scheduler

    负责任务调度资源的分配。

  • ScheduleBackend

    负责集群资源的获取和调度。

1.3 RDD属性

RDD的五个特征包含四个函数和一个属性

1.3.1 一组分片(Partition)分片

一组分片(Partition):即数据集的基本组成单位。每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU core的数目(设置的最大core数)。

1.3.2 一个计算每个分区的函数

RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。Spark 中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

1.3.3 依赖关系

RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系(窄依赖(有一对一),宽依赖(多对多))。在部分分区数据丢失时,Spark 可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

1.3.4 RDD的分片函数

**一个Partitioner,即RDD的分片函数。**当前 Spark 函数实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有partitioner,非key-value的RDD的Partitioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

1.3.5 一个列表

存储存取每个Partition的优先位置(preffered location)。对于一个HDFS文件来说,这个列表保存的就是Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark 在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

1.3.6 只读的

RDD是只读的,要想改变RDD中的数据,只能创建一个新的RDD转换到另一个RDD,通过操作算子(map、filter、union、join、reduceByKey … …)实现,不再像MR那样只能写map和reduce了。

1.4 RDD特点 1.4.1 分区

RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。

  1. RDD分区的原因

    要处理的原始数据很大,会被分成很多个分区,分别保存在不同的节点上。

  2. RDD分区的目的

    设置合理的并行度,提高数据处理的性能。

  3. RDD分区的原则

    尽可能使得分区的个数等于集群核心数目;

    尽可能使同一个RDD不同分区内的记录的数量一致;

  4. RDD如何分区

    Spark 包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区),数据分区方式只作用于形式的数据

    • HashPartitioner

      HashPartitioner采用哈希的方式对键值对数据进行分区。其数据分区规则为partitionedId=Key.hashCode % numPartitions,其中partitionedId代表该key对应的键值对数据分配到的Partition标识,Key.hashCode表示该key的哈希值,numPartitions表示包含的Partition个数

    • RangePartitioner

      Spark 引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的key分配到同一个Partition中,因此当某一或某几种雷西那个数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。

  5. 分区器

    • 作用

      决定了RDD中分区的个数;

      RDD中每条数据经过Shuffle过程属于哪个分区;

      reduce的个数;

      注意:

      • 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD的分区器的值是NONE
      • 如果是对键操作,则子RDD不再继承父RDD的分区器,但是分区数会继承
    • HashPartitioner(默认)

      hashCode % 分区数 = 余数—>决定在哪个区

      该分区方法保证key相同的数据出现在同一个分区中

      易发生数据倾斜

    • RangePartitioner(sort之类)

      简单的说就是将一定范围的数映射到某一个分区内

      sample采样抽样确定边界

    • 什么操作会导致子RDD失去父RDD的分区方式

      如果是对键操作,则子RDD不再继承父RDD的分区器,但是分区数会继承

      使用map()算子生成的RDD,由于该转换操作理论上可能会改变元素的键(Spark 并不会去判断是否真的改变了键),所有不再继承父RDD的分区器

    • 多元RDD的分区操作后,子RDD如何继承分区信息

      对于两个或多个RDD的操作,生成新的RDD,其分区方式,取决于父RDD的分区方式,如果两个父RDD都设置过分区方式,则会选择第一个父RDD的分区方式

    • 函数操作

      • 查看分区方式:rdd.partitioner

      • 查看分区个数:rdd.getNumPartitions == rdd.partitions.size

      • 查看分区存储规律:getElement(rdd)

      • 获得默认分区数:rdd.defaultParallelism

      • 重新定义partitioner主动使用分区

        用户可以通过partitionBy主动使用分区器,通过partitions参数指定想要分区的数量,可重新定义partitioner主动使用分区:var rdd1=rdd.partitionBy(new org.apache.spark.HashPartitioner(2))

      • 重新设置分区coalesce()和repartition()

        // coalesce() 和 repartition()
        val rdd=makeRDD(arr,9)
        rdd.coalesce(num,false)=rdd.coalesce(num)
        // 注意:num 的数值必须小于原分区的数量(因为是false)
        rdd.repartition(num)=rdd.coalesce(num,true)
        // 注意:num 可大于也可小于原分区的数值
        
1.4.2 只读

RDD是只读的,要想改变RDD中的数据,只能在现有RDD的基础上创建新的RDD;由一个RDD转换到另一个RDD,可以通过丰富的操作算子 (map,filter,union,join,‘reduceByKey’ … …)实现,不再像MR那样只能写map和reduce了。

1.4.3 依赖

RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs至今啊维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种:一种是窄依赖,RDDs之间分区是一一对应的;另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系

  • 窄依赖

    每个父RDD的一个Partition最多被子RDD的一个Partition所使用(1:1或 n:1)

  • 宽依赖

    一个父RDD的一个Partition会被多个或所有子RDD的Partition所使用(1:n或 n:n)

  • 作用

    • 用来解决数据容错
    • 用来划分stage
  • stage的划分

    • 划分

      action触发job,依照RDD依赖关系切分若干TaskSet即(stage);task任务处理的最小单元划分Stage

      • 从后向前遍历,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加如到Stage中;
      • 每个Stage里面的task的数量是由该Stage中最后一个RDD的Partition数量决定的;
      • 最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask
      • 代表当前Stage的算子一定是该Stage的最后一个计算步骤
  • 查看依赖

    • 查看依赖长度:rdd1.dependencies.size

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-owLnShV3-1633932110922)(C:Users21349Desktophtml2021-09-26_142733.png)]

    • 查看所有父RDD:rdd.dependencies.collect
    • 查看第一个父RDD数据并转换格式:rdd.dependencies(0).rdd.collect.asInstanceOf[Array[Int]]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-roFtNt6s-1633932110924)(C:Users21349Desktophtml9-262.png)]

1.4.4 持久化(缓存)

可以控制存储级别(内存、磁盘等)来进行持久化。如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

  • cache 持久化

    caceh实际上是persist的一种简化方式,是一种懒执行的,执行action类算子才会触发,cache后返回值要赋值给一个变量,下一个job直接基于变量进行操作。

  • persist

    可以指定持久化的级别,最常用的是MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER。”_2”表示有副本数。

  • cache 和 persist

    cache和persist算子后不能立即紧跟action算子。因为rdd.cache().count()返回的不是持久化的RDD,而是一个数值了。

  • checkpoint

    checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。

    checkpoint 的执行原理:
    (1) 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
    (2) 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
    (3) Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
    优化:
    对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。

1.5 创建RDD 1.5.1 合并行化创建(通过scala集合创建)

通过集合并行化方式创建RDD,适用于本地测试,做实验
scala中的本地集合 -->Spark RDD`

  • parallelize(数据集)方法
scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :26

scala> rdd.collect
collect   collectAsync

scala> rdd.collect
res18: Array[Int] = Array(1, 2, 3, 4, 5)

scala>
  • makeRDD(数据集)方法
scala> val arr = Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd = sc.makeRDD(arr)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[7] at makeRDD at :26

scala> rdd.collect
res19: Array[Int] = Array(1, 2, 3, 4, 5)

scala> 
1.5.2 文件系统,比如 HDFS
  • 读取wHDFS文件系统(默认)
scala> val rdd2 = sc.textFile("wc.txt")
rdd2: org.apache.spark.rdd.RDD[String] = wc.txt MapPartitionsRDD[9] at textFile at :24

scala> val rdd2 = sc.textFile("hdfs://linux121:9000/wc.txt")
rdd2: org.apache.spark.rdd.RDD[String] = hdfs://linux121:9000/wc.txt MapPartitionsRDD[11] at textFile at :24

scala> 
  • 读取本地文件
scala> val rdd2 = sc.textFile("file:///opt/lagou/servers/a.txt")
rdd2: org.apache.spark.rdd.RDD[String] = file:///opt/lagou/servers/a.txt MapPartitionsRDD[23] at textFile at :24

scala> rdd2.collect
res25: Array[String] = Array(hadoop, hive, kafka, hadoop, spark, azkaban, java, hdfs, hive)

scala> 
1.5.3 从父RDD转换成新的子RDD

调用Transformation类的方法,生成新的RDD只要调用transformation类的算子,都会生成一个新的RDD。RDD中的数据类型,由传入给算子的函数的返回值类型决定。

注意:action类的算子不会生成新的RDD

scala> val arr=Array(1,2,3,4,5)
arr: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd=sc.parallelize(arr)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at parallelize at :26

scala> rdd.map
map   mapPartitions   mapPartitionsWithIndex

scala> rdd.map(_*100)
res26: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[25] at map at :26

scala> rdd.collect
res27: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd2 = rdd.map(_*100)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[26] at map at :25

scala> rdd2.collect
res28: Array[Int] = Array(100, 200, 300, 400, 500)

scala>
二. 操作算子 2.1 RDD的Transformation算子 2.1.1value类型 2.1.1.1 map(func)函数

作用: 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)                         

scala> val rdd2 = rdd.map(_*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :25

scala> rdd2.collect
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

scala>
2.1.1.2 mapPartitions(func)

作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须时Iterator[T] => Iterator[U]。假设有 N 个元素,有 M 个分区,那么map的函数将被调用 N 次,而mapPartitions被调用 M 次,一个函数一次处理所有分区

scala> val rdd=sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24

scala> val rdd2 = rdd.map
map   mapPartitions   mapPartitionsWithIndex

scala> val rdd2 = rdd.mapPartitions
mapPartitions   mapPartitionsWithIndex

scala> val rdd2 = rdd.mapPartitions(x =>x.map(_*2))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at mapPartitions at :25

scala> rdd2.collect
res2: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

scala> 
2.1.1.3 mapPartitionsWithIndex(func)

作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int,Interator[T]) => Iterator[U];

scala> val rdd=sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :24

scala> val indexRDD=rdd.map
map   mapPartitions   mapPartitionsWithIndex

scala> val indexRDD=rdd.mapPartitions
mapPartitions   mapPartitionsWithIndex

scala> val indexRDD=rdd.mapPartitionsWithIndex((index,items) =>(items.map((index,_))))
indexRDD: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at :25

scala> indexRDD.collect
res3: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))

scala> 
2.1.1.4 flatMap(func)

作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

scala> val rdd=sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at :24

scala> rdd.collect
res4: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd2=rdd.flatMap(1 to _*2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at flatMap at :25

scala> rdd2.collect
res5: Array[Int] = Array(1, 2, 1, 2, 3, 4, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> 
2.1.1.5 map()和mapPartition()的区别
  • map():每次处理一条数据
  • mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM
  • 开发指导:当内存控件较大的时候建议使用mapPartition(),以提高处理效率。
2.1.1.6 glom()

作用:将一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]

// 创建一个含有4个分区的 RDD
scala> val rdd=sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :24
scala> rdd.glom().collect
collect   collectAsync
// 将每个 RDD 分区的数据放到一个数组并收集到 Driver 端打印
scala> rdd.glom().collect
res6: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))

scala> 
2.1.1.7 groupBy(func)

作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器

scala> val rdd=sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at :24

scala> val group = rdd.groupBy(_%2)
group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[13] at groupBy at :25

scala> group.collect
res7: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8, 10)), (1,CompactBuffer(7, 9, 1, 3, 5)))

scala> 
2.1.1.8 filter(func)

​ 作用:过滤,返回一个新的 RDD,该 RDD由经过func函数计算后返回值为true的输入元素组成。

scala> var sourceFilter=sc.parallelize(Array("xiaohe","xiaojiang","xiaoli","laozhao"))
sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at :24

scala> sourceFilter.collect
res5: Array[String] = Array(xiaohe, xiaojiang, xiaoli, laozhao)

scala> sourceFilter.filter(_ contains("xiao")).collect
res6: Array[String] = Array(xiaohe, xiaojiang, xiaoli)

scala>
2.1.1.9 sample(withReplacement,fraction,seed)

作用:以指定的随即种子随机抽样出数量为fraction的数据,``withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,seed`用于指定随机数生成器种子。

scala> var rdd=sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :24

scala> rdd.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> var sample1 = rdd.sa
sample   sampleStdev   sampleVariance   saveAsObjectFile   saveAsTextFile

scala> var sample1 = rdd.sample
sample   sampleStdev   sampleVariance

scala> var sample1 = rdd.sample(true,0.4,2)
sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[6] at sample at :25

scala> sample1.collect
res8: Array[Int] = Array(1, 2, 2)

scala> var sample1 = rdd.sample(false,0.2,3)
sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[7] at sample at :25

scala> sample1.collect
res9: Array[Int] = Array(1, 9)

scala> var sample2 = rdd.sample(false,0.2,3)
sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[8] at sample at :25

scala> sample2.collect
res10: Array[Int] = Array(1, 9)

scala>
2.1.1.10 distinc([numTasks])

作用:对源RDD进行去重返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。

scala> var rdd=sc.parallelize(List(1,2,1,5,2,6,9,1))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[9] at parallelize at :24

scala> rdd.collect
res11: Array[Int] = Array(1, 2, 1, 5, 2, 6, 9, 1)

scala> var rdd1 = rdd.distinct
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at distinct at :25

scala> rdd1.collect
res12: Array[Int] = Array(6, 2, 1, 9, 5)
// 指定并行度为 2
scala> var rdd2=rdd.distinct(2)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at distinct at :25

scala> rdd2.collect
res13: Array[Int] = Array(6, 2, 1, 9, 5)

scala>
2.1…2.11 coalesce(numPartitions)

作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

// 创建一个含有四个分区的 rdd
scala> var rdd=sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at :24

scala> rdd.partition
partitioner   partitions
// 查看 rdd 的分区
scala> rdd.partitions
res14: Array[org.apache.spark.Partition] = Array(org.apache.spark.rdd.ParallelCollectionPartition@921, org.apache.spark.rdd.ParallelCollectionPartition@922, org.apache.spark.rdd.ParallelCollectionPartition@923, org.apache.spark.rdd.ParallelCollectionPartition@924)
// 查看 rdd 的分区数
scala> rdd.partitions.size
res15: Int = 4

scala> var rdd1=rdd.co
coalesce   collect   collectAsync   compute   context   copy   count   countApprox   countApproxDistinct   countAsync   countByValue   countByValueApprox
// 对 rdd 进行重新分区
scala> var rdd1=rdd.coalesce(3)
rdd1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[17] at coalesce at :25

scala> rdd1.partition
partitioner   partitions
// 查看重分区后的分区数
scala> rdd1.partitions
res16: Array[org.apache.spark.Partition] = Array(CoalescedRDDPartition(0,ParallelCollectionRDD[16] at parallelize at :24,[I@127b7f9a,None), CoalescedRDDPartition(1,ParallelCollectionRDD[16] at parallelize at :24,[I@4b9cdae,None), CoalescedRDDPartition(2,ParallelCollectionRDD[16] at parallelize at :24,[I@328b6af3,None))

scala> rdd1.partitions.size
res17: Int = 3

scala> 
2.1.1.12 repartition(numPartitions)

作用:根据分区数,重新通过网络随机洗牌所有数据。

scala> var rdd=sc.parallelize(1 to 16,4)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.partition
partitioner   partitions

scala> rdd.partitions.size
res0: Int = 4

scala> rdd.repartition(2)
res1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at repartition at :26

scala> rdd.partition
partitioner   partitions

scala> rdd.partitions.size
res2: Int = 4

scala> var rdd1=rdd.repartition(2)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at repartition at :25

scala> rdd1.partition
partitioner   partitions

scala> rdd1.partitions.size
res3: Int = 2

scala>
2.1.1.13 coalesce 和 repartition的区别
  • coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle:Boolean=false/true决定
  • repartition实际上是调用的coalesce,默认是进行shuffle的。源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}
2.1.1.14 sortBy(func,[ascending],[numTasks])

作用:使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正常。

scala> var rdd = sc.parallelize(List(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24
// 按照自身大小排序
scala> rdd.sortBy(x => x).collect
res0: Array[Int] = Array(1, 2, 3, 4)                                            
// 按照与3余数的大小排序
scala> rdd.sortBy(x => x%3).collect
res1: Array[Int] = Array(3, 4, 1, 2)

scala> 
2.1.1.15 pipe(command,[envVars])

作用:管道,针对每个分区,都执行一个shell脚本,返回输出的RDD

// 创建脚本
#!/bin/sh

echo "AA"

while read LINE; do

   echo ">>>"${LINE}

done
// 将脚本分发到所有的节点上

// 创建一个只有一个分区的 RDD
scala> var rdd = sc.parallelize(List("hi","Hello","how","are","you"),1)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[15] at parallelize at :24

// 将脚本作用该 RDD 并打印
scala> rdd.pipe("/opt/lagou/servers/test.sh").collect
res5: Array[String] = Array(AA, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

// 创建一个有两个分区的 RDD
scala> var rdd = sc.parallelize(List("hi","Hello","how","are","you"),2)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at :24

// 将脚本作用该 RDD 并打印
scala> rdd.pipe("/opt/lagou/servers/test.sh").collect
res6: Array[String] = Array(AA, >>>hi, >>>Hello, AA, >>>how, >>>are, >>>you)

scala> 
2.1.2 双 value 类型交互 2.1.2.1 union(otherDataset)

作用:对源RDD和参数RDD求并集后返回一个新的RDD

scala> var rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at :24

scala> var rdd2 = sc.makeRDD(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at makeRDD at :24

scala> var rdd3 = rdd.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[21] at union at :27

scala> rdd3.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5, 5, 6, 7, 8, 9, 10)

scala> var rdd4 = rdd2.un
union   unpersist

scala> var rdd4 = rdd2.union(rdd)
rdd4: org.apache.spark.rdd.RDD[Int] = UnionRDD[22] at union at :27

scala> rdd4.collect
res8: Array[Int] = Array(5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5)

scala>
2.1.2.2 suntract(otherDataset)

作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来

scala> var rdd1 = sc.parallelize(3 to 8)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at :24

scala> var rdd2 = sc.makeRDD(1 to 5)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[24] at makeRDD at :24

scala> var rdd3 = rdd1.subtract(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[28] at subtract at :27

scala> rdd3.collect
res9: Array[Int] = Array(6, 8, 7)

scala> var rdd4 = rdd2.su
subtract   sum   sumApprox

scala> var rdd4 = rdd2.subtract(rdd1)
rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[32] at subtract at :27

scala> rdd4.collect
res10: Array[Int] = Array(2, 1)

scala>

注意:由以上程序可以看出,当使用subtract时,不同的RDD在前,输出的结果是不同的。

2.1.2.3 intersection(otherDataset)

作用:对源RDD和参数RDD求交集后返回一个新的RDD

scala> var rdd1 = sc.parallelize(1 to 7)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[33] at parallelize at :24

scala> var rdd2 = sc.makeRDD(5 to 10)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at makeRDD at :24

scala> var rdd3 = rdd1.intersection(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at intersection at :27

scala> rdd3.collect
res11: Array[Int] = Array(6, 7, 5)

scala> var rdd4 = rdd2.intersection(rdd1)
rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[46] at intersection at :27

scala> rdd4.collect
res12: Array[Int] = Array(6, 7, 5)

scala>
2.1.2.4 cartesian

作用:笛卡尔积(尽量避免使用)

scala> var rdd = sc.parallelize(1 to 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[47] at parallelize at :24

scala> var rdd2 = sc.parallelize(2 to 5)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[48] at parallelize at :24

scala> rdd.cartesian(rdd2).collect
res13: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (2,2), (2,3), (3,2), (3,3), (2,4), (2,5), (3,4), (3,5))

scala>
2.1.2.5 zip(otherDataset)

作用:将两个RDD组合成KeyValue形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

scala> var rdd1 = sc.parallelize(Array(1,2,3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[53] at parallelize at :24

scala> var rdd2 = sc.parallelize(Array("a","b","c"))
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[54] at parallelize at :24

scala> rdd1.zip(rdd2).collect
res18: Array[(Int, String)] = Array((1,a), (2,b), (3,c))

scala> rdd2.zip(rdd1).collect
res19: Array[(String, Int)] = Array((a,1), (b,2), (c,3))

scala> var rdd3 = sc.makeRDD(Array("11","22","33"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[57] at makeRDD at :24

scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 3)
  at org.apache.spark.rdd.ZippedPartitionsbaseRDD.getPartitions(ZippedPartitionsRDD.scala:58)
  at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:273)
  at scala.Option.getOrElse(Option.scala:189)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
  at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:990)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
  at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
  ... 49 elided

scala> rdd1.partitions.size
res21: Int = 2

scala> rdd3.partitions.size
res22: Int = 3

scala> 
2.1.3 Key-Value 类型 2.1.3.1 partitionBy

作用:对RDD进行分区操作,如果原有的RDD和现有的RDD是一致的话就不进行分区,否则会生成ShuffleRDD,即会产生shuffle过程

scala> var rdd = sc.parallelize(Array((1,"aaa"),(2,"bbb"),(3,"ccc"),(4,"ddd")),4)
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[59] at parallelize at :24

scala> rdd.partitions.size
res23: Int = 4

scala> var rdd2 = rdd.partition
partitionBy   partitioner   partitions

scala> var rdd2 = rdd.partitionBy(new org.)
antlr         ehcache      ietf      jcp      junit     netlib      omg             terracotta   xml     
aopalliance   fusesource   iq80      jets3t   jvnet     nustaq      roaringbitmap   tukaani      yaml    
apache        glassfish    j_paine   joda     mockito   objectweb   slf4j           w3c          znerd   
codehaus      hamcrest     jboss     json4s   mortbay   objenesis   spark_project   xerial               

scala> var rdd2 = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, String)] = ShuffledRDD[60] at partitionBy at :25

scala> rdd2.partitions.size
res24: Int = 2

scala> 
2.1.3.2 groupByKey

作用:对每个key进行操作,但只生成一个sequence

scala> var words = Array("one","two","two","three","three","three")
words: Array[String] = Array(one, two, two, three, three, three)

scala> val rdd = sc.parallelize(words).map(word => (word,1))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[62] at map at :26

scala> val groupRdd = rdd.groupBy
groupBy   groupByKey
// 将相同 key 对应值聚合到一个 sequence 中
scala> val groupRdd = rdd.groupByKey()
groupRdd: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[63] at groupByKey at :25

scala> group
groupRdd   grouping   grouping_id
// 打印结果
scala> groupRdd.collect
res25: Array[(String, Iterable[Int])] = Array((two,CompactBuffer(1, 1)), (one,CompactBuffer(1)), (three,CompactBuffer(1, 1, 1)))
// 计算相同 key 对应值的相加结果
scala> groupRdd.map(t => (t._1,t._2.sum))
res26: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[64] at map at :26
// 打印结果
scala> res26.collect
res27: Array[(String, Int)] = Array((two,2), (one,1), (three,3))

scala> 
2.1.3.3 reduceByKey(func,[numTasks])

作用:在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

scala> val rdd = sc.parallelize(List(("female",1),("male",5),("female",5),("male",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[65] at parallelize at :24

scala> val reduce = rdd.reduce
reduce   reduceByKey   reduceByKeyLocally

scala> val reduce = rdd.reduceByKey
reduceByKey   reduceByKeyLocally
// 计算相同 key 对应值的相加结果
scala> val reduce = rdd.reduceByKey((x,y) => x+y)
reduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[66] at reduceByKey at :25

// 打印结果
scala> reduce.collect
res28: Array[(String, Int)] = Array((female,6), (male,7))

scala> 
2.1.3.4 reduceByKey和groupByKey的区别
  • reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]
  • groupByKey:按照key进行分组,直接进行shuffle
  • 开发指导:效率相等的情况下选择熟悉的,groupByKey在一般情况下效率低,尽量少用,reduceByKey比groupByKey效率高,建议使用。但是需要注意是否会影响业务逻辑。
2.1.3.5 aggregateByKey重点

参数:(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

提示:seqOp表示分区内进行的操作,combOp表示分区间进行的操作

作用:在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

参数描述:

  • zeroValue:给每一个分区中的每一个key一个初始值;(每个分区内用到初始值,分区间不用初始值)

  • seqOp:函数用于再每一个分区中用初始值逐步迭代value;

  • combOp:函数用于合并每个分区中的结果;

案例:取出每个分区相同key对应值的最大值,然后相加

// 创建一个 rdd
scala> val rdd = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[67] at parallelize at :24

// 取出每个分区相同 key 对应的最大值,然后相加
scala> val agg = rdd.aggregate
aggregate   aggregateByKey

scala> val agg = rdd.aggregateByKey(0)(math.max(_,_),_+_)
agg: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[68] at aggregateByKey at :25

// 打印结果
scala> agg.collect
res29: Array[(String, Int)] = Array((b,3), (a,3), (c,12))

scala>
2.1.3.6 foldByKey

作用:aggregateByKey的简化操作,seqop和combop相同

参数:(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

// 创建一个 rdd
scala>  val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[71] at parallelize at :24

 // 计算相同 key 对应值的相加结果
scala> var agg = rdd.foldByKey(0)(_+_)
agg: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[72] at foldByKey at :25

// 打印结果
scala> agg.collect
res31: Array[(Int, Int)] = Array((3,14), (1,9), (2,3))

scala>
2.1.3.7 combineByKey[C](了解即可)

作用:对相同K,把V合并成一个集合

参数:(createCombiner: V => C, mergevalue: (C, V) => C, mergeCombiners: (C, C) => C)

参数描述:

  • createCombiner:会遍历分区中的所有元素,因此每个combineByKey元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫做createCombiner()的函数来创建哪个键对应的累加器的初始值。
  • mergevalue:如果这是一个在处理当前分区之前已经遇到的键,它会使用mergevalue()方法将该键的累加器对应的当前值与这个新的值进行合并。
  • createCombiners:由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都是由对应他同一个键的累加器,就需要使用用户提供的createCombiners()方法将各个分区的结果进行合并。

案例:创建一个pairRDD,根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)

scala> // 创建一个 rdd
scala> val input = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)input: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[74] at parallelize at :24

scala> // 将相同key对应的值相加,,同时记录该key出现的次数,,放入一个二元组
scala> val combine = input.combineByKey((_,1),(acc:(Int,Int),v)=>(acc._1+v,acc._2+1),(acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2))
combine: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[75] at combineByKey at :25

scala> // 打印合并后的结果

scala> com
com   combine   compat

scala> combine.collect
res32: Array[(String, (Int, Int))] = Array((b,(286,3)), (a,(274,3)))

scala> // 计算平均值

scala> val result = combine.map{case (key,value) => (key,value._1/value._2.toDouble)}
result: org.apache.spark.rdd.RDD[(String, Double)] = MapPartitionsRDD[76] at map at :25

scala> // 打印结果

scala> result.collect
res33: Array[(String, Double)] = Array((b,95.33333333333333), (a,91.33333333333333))

scala> 
2.1.3.8 sortByKey([ascending],[numTasks])

作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

scala> // 创建一个 pairRDD

scala> val rdd = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[77] at parallelize at :24

scala> // 按照 key 的正序

scala> rdd.sortBy
sortBy   sortByKey

scala> rdd.sortByKey(true).collect
res35: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))

scala> // 按照 key 的倒序

scala> rdd.sortByKey(false).collect
res36: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

scala> 
2.1.3.9 mapValues

作用:针对于(K,V)形式的类型只对V进行操作

scala> // 创建一个 pairRDD

scala> val rdd = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[0] at parallelize at :24

scala> // 对 value 添加字符串 “|||”

scala> rdd.map
map   mapPartitions   mapPartitionsWithIndex   mapValues

scala> rdd.mapValues(_+"|||").collect
res0: Array[(Int, String)] = Array((1,a|||), (1,d|||), (2,b|||), (3,c|||))      

scala> 
2.1.3.10 join(othreDataset,[numTasks]) leftOuterJoin/rightOuterJoin/fullOuterJoin

作用:在类型(K,V),(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(K,W))的RDD

scala> // 创建一个 pairRDD

scala> val rdd = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[2] at parallelize at :24

scala> // 创建第二个 pairRDD

scala> val rdd2 = sc.parallelize(Array((1,4),(2,5),(3,6)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at :24

scala> rdd.join(rdd2).collect
res1: Array[(Int, (String, Int))] = Array((2,(b,5)), (1,(a,4)), (3,(c,6)))

scala> val rdd3 = sc.parallelize(Array((1,4),(2,5),(3,6),(4,7)))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[7] at parallelize at :24

scala> rdd.join(rdd3).collect
res2: Array[(Int, (String, Int))] = Array((2,(b,5)), (1,(a,4)), (3,(c,6)))

scala> rdd3.join(rdd).collect
res3: Array[(Int, (Int, String))] = Array((2,(5,b)), (1,(4,a)), (3,(6,c)))

scala>
2.1.3.11 cogroup(otherDataset,[numTasks])

作用:在类型为(K,V),(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD

scala> // 创建两个 pairRDD rdd1 和 rdd2

scala> val rdd1 = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[15] at parallelize at :24

scala> val rdd2 = sc.makeRDD(Array((1,4),(2,5),(3,6)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[16] at makeRDD at :24

scala> // 使用 cogroup 操作 rdd1 和 rdd2

scala> rdd1.cogroup(rdd2).collect
res4: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((2,(CompactBuffer(b),CompactBuffer(5))), (1,(CompactBuffer(a),CompactBuffer(4))), (3,(CompactBuffer(c),CompactBuffer(6))))

scala> rdd2.cogroup(rdd1).collect
res5: Array[(Int, (Iterable[Int], Iterable[String]))] = Array((2,(CompactBuffer(5),CompactBuffer(b))), (1,(CompactBuffer(4),CompactBuffer(a))), (3,(CompactBuffer(6),CompactBuffer(c))))

scala> val rdd3 = sc.makeRDD(Array((1,4),(2,5),(3,6),(4,7)))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[21] at makeRDD at :24

scala> rdd1.cogroup(rdd3).collect
res6: Array[(Int, (Iterable[String], Iterable[Int]))] = Array((4,(CompactBuffer(),CompactBuffer(7))), (2,(CompactBuffer(b),CompactBuffer(5))), (1,(CompactBuffer(a),CompactBuffer(4))), (3,(CompactBuffer(c),CompactBuffer(6))))

scala> rdd3.cogroup(rdd1).collect
res7: Array[(Int, (Iterable[Int], Iterable[String]))] = Array((4,(CompactBuffer(7),CompactBuffer())), (2,(CompactBuffer(5),CompactBuffer(b))), (1,(CompactBuffer(4),CompactBuffer(a))), (3,(CompactBuffer(6),CompactBuffer(c))))

scala>
2.2 RDD的action操作算子 2.2.1 reduce(func)

作用:通过func函数聚集RDD中的所有元素,现聚合分区内数据,再聚合分区间数据。

def reduce(f: (T, T) => T): T = withScope {
    val cleanF = sc.clean(f)
    val reducePartition: Iterator[T] => Option[T] = iter => {
      if (iter.hasNext) {
        Some(iter.reduceLeft(cleanF))
      } else {
        None
      }
    }
    var jobResult: Option[T] = None
    val mergeResult = (index: Int, taskResult: Option[T]) => {
      if (taskResult.isDefined) {
        jobResult = jobResult match {
          case Some(value) => Some(f(value, taskResult.get))
          case None => taskResult
        }
      }
    }
    sc.runJob(this, reducePartition, mergeResult)
    // Get the final result out of our Option, or throw an exception if the RDD was empty
    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
  }
scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at makeRDD at :24

scala> rdd1.reduce(_+_)
res6: Int = 55

scala> val rdd2 = sc.parallelize(Array(("a",1),("c",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[4] at parallelize at :24

scala> rdd2.reduce((x,y)=>(x._1+y._1,x._2+y._2))
res7: (String, Int) = (cdac,12)

scala> 
2.2.2 collect

作用:在驱动程序中,以数组的形式返回数据集的所有元素

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }
scala> val rdd = sc.makeRDD(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :24

scala> rdd.collect
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> 
2.2.3 count

作用:返回RDD中元素的个数

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24

scala> rdd.count
res0: Long = 10                                                                 

scala> 
2.2.4 first

作用:返回RDD中的第一个元素

  
  def first(): T = withScope {
    take(1) match {
      case Array(t) => t
      case _ => throw new UnsupportedOperationException("empty collection")
    }
  }
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at :24

scala> rdd.fi
filter   first

scala> rdd.first
res1: Int = 1

scala> 
2.2.5take(n)

作用:返回一个由RDD前 n 个元素组成的数组

  
  def take(num: Int): Array[T] = withScope {
    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2)
    if (num == 0) {
      new Array[T](0)
    } else {
      val buf = new ArrayBuffer[T]
      val totalParts = this.partitions.length
      var partsScanned = 0
      while (buf.size < num && partsScanned < totalParts) {
        // The number of partitions to try in this iteration. It is ok for this number to be
        // greater than totalParts because we actually cap it at totalParts in runJob.
        var numPartsToTry = 1L
        val left = num - buf.size
        if (partsScanned > 0) {
          // If we didn't find any rows after the previous iteration, quadruple and retry.
          // Otherwise, interpolate the number of partitions we need to try, but overestimate
          // it by 50%. We also cap the estimation in the end.
          if (buf.isEmpty) {
            numPartsToTry = partsScanned * scaleUpFactor
          } else {
            // As left > 0, numPartsToTry is always >= 1
            numPartsToTry = Math.ceil(1.5 * left * partsScanned / buf.size).toInt
            numPartsToTry = Math.min(numPartsToTry, partsScanned * scaleUpFactor)
          }
        }

        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts).toInt)
        val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

        res.foreach(buf ++= _.take(num - buf.size))
        partsScanned += p.size
      }

      buf.toArray
    }
  }
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :24

scala> rdd.take
take   takeAsync   takeOrdered   takeSample

scala> rdd.take(3)
res2: Array[Int] = Array(2, 5, 4)

scala>
2.2.6takeOrdered(n)

作用:返回由该RDD排序后的前 n 个元素组成的数组

  
  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0) {
      Array.empty
    } else {
      val mapRDDs = mapPartitions { items =>
        // Priority keeps the largest elements, so let's reverse the ordering.
        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
        queue ++= collectionUtils.takeOrdered(items, num)(ord)
        Iterator.single(queue)
      }
      if (mapRDDs.partitions.length == 0) {
        Array.empty
      } else {
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      }
    }
  }
scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :24

scala> rdd.takeOrdered(3)
res3: Array[Int] = Array(2, 3, 4)

scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3,3,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at :24

scala> rdd.takeOrdered(3)
res4: Array[Int] = Array(2, 3, 3)

scala>
2.2.7aggregate

参数:(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)

作用:aggregate函数将每个分区里面的元素通过seq()和初始值进行聚合,然后用combine函数将每个分区的结果在和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。每个分区内会用到初始值,然后分区间会再次用到初始值

  
  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }
scala> val rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :24

scala> rdd.aggregate(0)(_+_,_+_)
res2: Int = 55

scala> rdd.aggregate(1)(_+_,_+_)
res3: Int = 58

scala> rdd.aggregate(10)(_+_,_+_)
res4: Int = 85

scala> rdd.aggregate(2)(_+_,_+_)
res5: Int = 61

scala> val rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24

scala> rdd.aggregate(0)(_+_,_+_)
res6: Int = 55

scala> rdd.aggregate(1)(_+_,_+_)
res7: Int = 59

scala> rdd.aggregate(10)(_+_,_+_)
res8: Int = 95

scala> 
// 初始值 * (分区个数 + 1)+ 原来的值
2.2.8fold(num)(func)

作用:折叠操作,aggregate的简化操作,seqop和combop一样。

  
  def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }
scala> val rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at makeRDD at :24

scala> rdd.fold(0)(_+_)
res10: Int = 55

scala> rdd.fold(1)(_+_)
res11: Int = 58

scala> rdd.fold(2)(_+_)
res12: Int = 61

scala> val rdd = sc.makeRDD(1 to 10,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :24

scala> rdd.fold(0)(_+_)
res13: Int = 55

scala> rdd.fold(1)(_+_)
res14: Int = 59

scala> rdd.fold(2)(_+_)
res15: Int = 63

scala> 
2.2.9saveAsTextFile(path)

作用:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它转换为文件中的文本

  
  def saveAsTextFile(path: String): Unit = withScope {
    // https://issues.apache.org/jira/browse/SPARK-2075
    //
    // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
    // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
    // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
    // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
    // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
    //
    // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
    // same bytecodes for `saveAsTextFile`.
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

  
  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = withScope {
    // https://issues.apache.org/jira/browse/SPARK-2075
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
  }
2.2.10saveAsObjectFile(path: String)

作用:用于将RDD中的元素序列化成对象,存储到文件中。

  
  def saveAsObjectFile(path: String): Unit = withScope {
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
      .saveAsSequenceFile(path)
  }
2.2.11countByKey

作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

  
  def countByKey(): Map[K, Long] = self.withScope {
    self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
  }
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[6] at parallelize at :24

scala> rdd.countByKey
res16: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

scala>
2.2.12foreach(func)

作用:在数据集的每一个元素上,运行函数func进行更新

  // Actions (launch a job to return a value to the user program)

  
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }
四. 输入与输出 4.1 文件输入与输出 4.1.1 文本文件 4.1.2 csv/tsv 文件 4.1.3 json文件(不建议使用) 4.1.4 sequenceFile 4.1.5 对象文件
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/316402.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号