栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark-checkpoint机制的读写流程

Spark-checkpoint机制的读写流程

Spark_checkpoint机制简介 :

首先明确RDD是一个分布式弹性数据集 , 但是RDD中不存数据 , 只存计算逻辑 数据地址和父RDD血缘关系等

在spark计算过程中 , 业务负责 , 计算流程DAG比较长且数据重要不可丢失 , 并且中间RDD需要多次复用或需要在其他模块中调用 , 需要使用spark_RDD的checkpoint机制 , 将中间结果RDD持久化到磁盘

理解RDD.cache()和RDD.checkpoint()的区别 : 

cache()缓存 :

适用于本次Job多次复用某个DAG链较长的中间RDD , 缓存以后不用每次都从头计算 , 二次复用会调用缓存中的RDD结果数据集 , 省去前边冗余的计算流程链 , 节省资源 , 提高效率

checkpoint() :

RDD结果数据持久化到一个高可用的位置(HDFS) , 我这里测试直接落地到本地磁盘 , 省去前边负责冗余的计算流程链 , 节省资源 , 提高效率 , 数据多次复用与其他板块

话不多说直接上代码:

将重要的中间结果RDD.checkpoint持久化数据到本地磁盘 , 如下

object checkPointRDDToDir {
  def main(args: Array[String]): Unit = {
    //创建conf对象
    val conf = new SparkConf().setAppName("checkpoint").setMaster("local[2]")
    //创建sparkContext对象
    val sparkContext = new SparkContext(conf)
    //本地集合并行化创建RDD 指定并行度(分区数)为 2
    val rdd: RDD[Int] = sparkContext.parallelize(List[Int](1, 2, 3, 4, 5, 6, 7, 8), 2)
    //假设一段超长计算流程DAG.... 得到结果数据RDD
    val resRDD: RDD[Int] = rdd.map(_ * 10).map(_ * 2).map(_ + 10).map(_ * 5).map(_ - 100)
    //resRDD 缓存 & checkpoint
    resRDD.persist(StorageLevel.MEMORY_ONLY)  //缓存级别只存于内存中
    sparkContext.setCheckpointDir("E:\checkpoint\out_01")  //checkpoint之前设置目的地文件夹
    resRDD.checkpoint()   //中间结果数据持久化到磁盘

    //转换算子无法触发执行 调用行动算子触发执行
    resRDD.collect()
  }
}

目标文件夹不能存在 , 运行成功后目标路径下会自动产生一个结果数据文件夹

 这样数据就持久化到磁盘了 , 需要注意的是 , checkpoint后的RDD数据已经断开了和原父RDD的血缘关系 , 成为了一个独立个体 , 重新获取成RDD也是一个全新的RDD , 不存在之前的任何计算逻辑等等 , 这个要搞清楚~~

将checkpoint后的数据反向获取成一个新的RDD:

注意 : 这里有个小细节,SparkContext类中存在一个获取checkpoint持久化RDD数据的方法,但是这个方法是 protected 修饰 , 受保护的方法 , 只能在本类中调用或者同一个包下的类调用 ,这时候我们可以自己创建一个包名 org.apache.spark 这一招 [瞒天过海] 然后在这个包下写一个自己的工具类去调用这个方法, 就可以获取到 checkpoint 文件啦~~

自定义   org.apache.spark.RDDUtil

package org.apache.spark
import org.apache.spark.rdd.RDD

object RDDUtil {
                                  //返回值类型记得和原checkpointRDD一致
  def getCheckPointRDD(path:String): RDD[Int] ={
    //创建sparkContext对象
    val sparkContext = new SparkContext(new SparkConf().setAppName("Get").setMaster("local[*]"))
    //同包下调用checkpointFile方法 此处默认返回值类型是 RDD[Nothing] 改成原RDD类型
    val resRDD: RDD[Int] = sparkContext.checkpointFile(path)
    //返回一个RDD
    resRDD
  }
}
反向获取checkpoint文件返回新的RDD
import org.apache.spark.RDDUtil
import org.apache.spark.rdd.RDD


object checkPointBackToRDD {
  def main(args: Array[String]): Unit = {
    //注意这里的地址要写到具体文件的上一层文件夹
    val path:String = "E:\checkpoint\out_01\2198d8bd-96c9-4e7c-950c-7fe6d8d212c3\rdd-5"
    //调用方法返回RDD
    val rdd: RDD[Int] = RDDUtil.getCheckPointRDD(path)
    //查看RDD中数据
    rdd.foreach(println)
  }
}
♥ 看完别忘点赞哦 ♥
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699522.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号