栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

(7)RDD缓存 & 检查点

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

(7)RDD缓存 & 检查点

RDD缓存 & 检查点

RDD缓存

缓存的作用怎么缓存?

persist方法 & cache方法方法源码解析 RDD CheckPoint(检查点)

CheckPoint的作用 缓存和检查点区别建议检查点 和 缓存 联合使用

RDD缓存 缓存的作用
    将计算结果缓存到内存或磁盘中,供后面重用。
    因为RDD中不存储数据,如果一个RDD需要重复使用,那么需要从头再次执行来获取数据,所以为了解决这个问题,RDD提供了缓存的方法在数据执行较长,或数据比较重要的场合也可以采用持久化操作。
    如果数据后续出现问题,就不用从头再执行一遍了,提高了数据的安全性
怎么缓存? persist方法 & cache方法

RDD通过persist方法或cache方法可以将前面的计算结果缓存
( 但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。)

	//cache 仅保存在内存中
    reduceRdd.cache()  
    //persist 可以更改存储级别
    reduceRdd.persist(StorageLevel.MEMORY_AND_DISK)   //保存在内存和磁盘中
    reduceRdd.persist(StorageLevel.DISK_ONLY) //仅保存在磁盘中
方法源码解析

通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,
他们的区别是,cache仅能存在内存中,persist 可通过StorageLevel 自定义存储级别,确定存储位置。

Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
在存储级别的末尾加上“_2”来把持久化数据存为两份

  //NONE 就是不缓存,等于没用这个方法
  val NONE = new StorageLevel(false, false, false, false)
  //只写到磁盘去,比较安全,但是性能低,因为涉及到io,所以性能低
  val DISK_onLY = new StorageLevel(true, false, false, false)
  //写到磁盘,有两个备份
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  //只写道内存中,不安全,但性能高
  val MEMORY_onLY = new StorageLevel(false, true, false, true)
  //写到内存中,有两个备份
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  //序列化后再写到内存
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  //序列化后写到内存,有两个备份
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  //如果内存中放不下,则写到磁盘
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  //如果内存中放不下,则写到磁盘,有两个备份
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  //如果内存中放不下,则写到磁盘,在内存中放序列化后的数据
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  //如果内存中放不下,则写到磁盘,在内存中放序列化后的数据,有两个备份
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  //写到堆内存之外的系统内存
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

RDD CheckPoint(检查点) CheckPoint的作用

检查点其实就是通过将RDD中间结果写入磁盘。

由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

// 设置检查点路径
sc.setCheckpointDir("./checkpoint1")

// 创建一个RDD,读取指定位置文件:hello atguigu atguigu
val lineRdd: RDD[String] = sc.textFile("input/1.txt")

// 业务逻辑
val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
    word => {
        (word, System.currentTimeMillis())
    }
}

// 增加缓存,避免再重新跑一个job做checkpoint
wordToOneRdd.cache()
// 数据检查点:针对wordToOneRdd做检查点计算
wordToOneRdd.checkpoint()

// 触发执行逻辑
wordToOneRdd.collect().foreach(println)
缓存和检查点区别

    数据保存时间不同
    1.1 缓存是 将数据临时保存,如果作业执行完毕,临时保存的数据文件就会丢失。
    Cache 将数据临时保存在内存中进行数据重用。

    persist 将数据临时存储在磁盘文件中进行数据重用,涉及到磁盘io,性能较低,但是数据安全。

    1.2 Checkpoint 将数据长久的保存在磁盘文件中,进行数据重用。

    Checkpoint检查点切断血缘依赖,从检查点开始重做血缘。
    缓存会在血缘关系中添加新的依赖。

未执行缓存和检查点之前的血缘关系

执行缓存后,会在血缘关系中添加新的依赖

Checkpoint检查点切断血缘依赖,从检查点开始重做血缘。

建议检查点 和 缓存 联合使用

为了保证数据安全,所以一般情况下,会独立执行作业(job)。当行动算子触发执行时,Checkpoint会产生一个新的作业(job),也就是作业会从头执行两遍,效率太低。
为了提高效率,需要和Cache联合使用,这样checkpoint的job只需从Cache缓存中读取数据即可,不需要再从头计算一次RDD。

如图源码所示,在Checkpoint中,会再单独执行一次runJob

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/749172.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号