2、认识RDD
1、RDD是什么 RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据
抽象。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算
的集合
-
A list of partitions RDD是一个包含了多个分区的列表
-
A function for computing each split RDD会在每个分片(分区)上单独启动一个线程并行完成函数运算
-
A list of dependencies on other RDDs RDD会记录与上游RDD的依赖列表
-
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) 二元组作为泛型的RDD,可以设置分区器 按照key对数据进行分组或者聚合操作
-
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) RDD会默认从最优位置读取原始文件
2、不可变(只读) 由于RDD计算出错时,需要从上游RDD读取数据重新计算 所以每次 转换算子操作,都不改变原有RDD的数据 而是读取原有RDD的数据将计算完的结果写入到新的RDD中
3、 弹性 随时调整并发度 RDD中的分区数 对应Spark程序中的(Task)线程数(并发度) 可以在每一步计算中随时调整RDD的分区数来改变程序的并发度从而优化任务流程
4、 任务出错可以自动重试 Spark任务计算出错时,会首先从上游RDD重新读取数据进行重试 如果重试失败,会将当前任务迁移到另一台worker重新尝试
5、 懒加载(懒执行) RDD只有在调用action算子时才从头开始真正执行计算任务 而如果整个RDD依赖列表没有action操作 RDD仅仅编译并记录依赖关系,不进行真正的执行
6、RDD的四类操作
-
创建RDD
-
RDD transformation 转换算子 RDD.转换算子 => RDD
-
RDD action 行动算子 RDD.行动算子 => !RDD
-
RDD 缓存操作
3、创建RDD
3.1 通过scala集合创建RDD 受限于java或者scala中集合元素的限制 通常用于在测试场景中 快速生成RDD 3.2 makeRDD
parallelize与makeRDD类似由于不好读不好记,所以重新封装为makeRDD
//def makeRDD[T: ClassTag](
//通过Scala集合,将Scala集合中的元素放入RDD,作为RDD的元素
// seq: Seq[T],
// 设置生成RDD的分区数(并发度(线程数))
// numSlices: Int = defaultParallelism
// ): RDD[T] =
val rdd = sc.makeRDD(1 to 100000, 10)
val numPartitions = rdd.getNumPartitions
println(numPartitions)
rdd.collect()
//1.1.2 parallelize
// 与makeRDD类似由于不好读不好记,所以重新封装为makeRDD
val rdd1: RDD[Char] = sc.parallelize("sadfasdgdh")
3.2 通过读取文件创建RDD 真正进行海量数据处理时 通常读取类似HDFS的分布式文件系统中的大文件 3.2.1 读取文本文件 默认将每一行作为一个元素
//读取文本文件
val sc = new SparkContext(conf)
val rdd1 = sc.textFile("F:\大数据\weblog\weblog\access.log-20211101")
rdd1.map(line => {
val hour = line.split(":")(1)//小时数
(hour,1)
})
.reduceByKey(_+_)
.sortBy(_._2,false)
.foreach(println)
3.2.2 读取序列化文本文件
val rdd2:RDD[(String,String)] = sc.sequenceFile[String, String]("C:\Users\Administrator\Desktop\output")
rdd2
.flatMap(x => {
val strings = x._2.split("n")
.map(_.split(":")(1))
strings
})
.map((_,1))
.reduceByKey(_+_)
.sortBy(_._2,false)
.foreach(println)
3.2.3 对象文件 objectFile
val rdd23 = sc.objectFile("C:\Users\Administrator\Desktop\output1")
rdd23.foreach(println)
4.RDD的缓存算子
RDD的缓存算子有两个
1.cache
cache()底层调用空参persist() rdd.cache()
在windows环境运行spark
1.2. 空参persist()
底层调用有参的persist(StorageLevel.MEMORY_ONLY)使用仅内存方式缓存RDD的元素 rdd.persist()
1.3. 有参persist()
直接调用有参的persist()并自定义存储级别 rdd.persist(StorageLevel.MEMORY_AND_DISK)
常用的存储级别和优劣
MEMORY_onLY 内存足够的情况下: MEMORY_AND_DISK 如果考虑内存不足会内存溢出异常,则可以选择稳妥的方式,内存足够优先使用内存,内存不够时 将数据缓存到磁盘。 OFF_HEAP 通过非堆存储技术,将RDD中的对象存储在非JVM堆内存,原因是JVM的HEAP中对象在回收(GC)时 ,需要暂停任务线程导致整理效率降低。
2.检查点机制
需求:RDD的缓存会随着任务结束而自动删除,如果需要将计算过程中的某些RDD永久保存在磁盘上,可以使用检查点
checkpoint主要使用场景: 通过在复杂计算任务的中间某些关键环节添加checkpoint 在任务结束后 对关键环节的数据进行复盘,方便错误排查
checkpoint在任务结束前可以代替DISK级别的缓存 任务结束后,checkpoint会保存为随机UUID,每次任务都会单独生成 checkpoint
(1)设置检查点
scala> sc.setCheckpointDir("hdfs://hadoop102:9000/checkpoint")
(2)创建一个 RDD
scala> val rdd = sc.parallelize(Array("atguigu"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[14] at parallelize at :24
(3)将 RDD 转换为携带当前时间戳并做 checkpoint
scala> val ch = rdd.map(_+System.currentTimeMillis)
ch: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at :26
scala> ch.checkpoint
(4)多次打印结果
scala> ch.collect
res56: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res57: Array[String] = Array(atguigu1538981860504)
scala> ch.collect
res58: Array[String] = Array(atguigu1538981860504)
3、RDD分区
可以通过使用 RDD 的 partitioner 属性来获取 RDD 的分区方式。
获取分区数量
val rdd = sc .makeRDD(1 to 10) val num = rdd.getNumPartitions//获取分区数量,设置cpu为2,则默认分区也为2 println(num) //2
输出每个分区中的元素
// def map[U](f: T => U): RDD[U] =
// def mapPartitions[U](
// f: Iterator[T] => Iterator[U],
// preservesPartitioning: Boolean = false): RDD[U] =
rdd.mapPartitions(iter => {
val str = iter.mkString(",")
Iterator(str)
})
.foreach(println)
RDD[(K,V)]的RDD可以通过partitionBy指定分区器
partitionBy: 分区器Spark默认实现了2种,注意,partitionBy需要二元组才能调用
1、 HashPartitioner
根据key的hashCode与分区数取模 优势: 将相同key分到相同分区 方便进行ByKey操作 缺陷: 如果某些Key较多 会造成数据倾斜 2、 RangePartitioner
使用抽样算法 随机抽取并轮询分发数据到不同分区 优势: 分区后RDD每个分区中的元素数量相差无几 缺陷: 会将数据打散,如果需要进行ByKey需要重新shuffle
3、 如果上述两种分区器不符合业务要求可以自定义分区器类 编写类 继承 Partitioner numPartitions => 返回分区数 getPartition(key:Any) => 返回根据key计算出的分区编号
package com.zch.spark.core.exercise
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
object Exercise_SparkCoreDemo06 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("demo6")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to 10)
rdd.map(i =>(i,i))
.partitionBy(ModPartitioner(2))
.map(_._1)
.mapPartitions(iter =>{
val str = iter.mkString(",")
Iterator(str)
})
.foreach(println)//2,4,6,8,10
// 1,3,5,7,9
}
private def ModPartitioner(num:Int) = {
new Partitioner {
override def numPartitions: Int = num
override def getPartition(key: Any): Int = {
//根据key值 和 numPartitions 取模为奇数或者偶数分区
key.asInstanceOf[Int] % num
}
}
}
}
(1)创建一个 pairRDD scala> val pairs = sc.parallelize(List((1,1),(2,2),(3,3))) pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[3] at parallelize at:24 (2)查看 RDD 的分区器 scala> pairs.partitioner res1: Option[org.apache.spark.Partitioner] = None (3)导入 HashPartitioner 类 scala> import org.apache.spark.HashPartitioner import org.apache.spark.HashPartitioner (4)使用 HashPartitioner 对 RDD 进行重新分区 scala> val partitioned = pairs.partitionBy(new HashPartitioner(2)) partitioned: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[4] at partitionBy at :27 (5)查看重新分区后 RDD 的分区器 scala> partitioned.partitioner res2: Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)



