栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark源码——CheckPoint原理

Spark源码——CheckPoint原理


(图片来源:北风网)

找到RDD的iterator方法

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
    //如果持久化级别不为none
    //直接去取持久化数据并计算
      getOrCompute(split, context)
    } else {
    //否则计算或者读取checkpoint
      computeOrReadCheckpoint(split, context)
    }
  }

进入computeOrReadCheckpoint

 
  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
  {
    if (isCheckpointedAndMaterialized) {
    //如果rdd被checkpoint和materialized
    //调用父RDD的iterator方法,就是从checkpoint外部文件系统中读取数据
      firstParent[T].iterator(split, context)
    } else {
    //没有checkpoint就老老实实计算
      compute(split, context)
    }
  }
  
  private[spark] def isCheckpointedAndMaterialized: Boolean =
    checkpointData.exists(_.isCheckpointed)
 
 //获取父RDD
 //
  protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
    dependencies.head.rdd.asInstanceOf[RDD[U]]
  }

读取checkpoint

  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
    val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
    ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
  }

可以看到,采用了hadoop的api去读取hdfs数据

 
  def readCheckpointFile[T](
      path: Path,
      broadcastedConf: Broadcast[SerializableConfiguration],
      context: TaskContext): Iterator[T] = {
    val env = SparkEnv.get
    //从hdfs读数据
    val fs = path.getFileSystem(broadcastedConf.value.value)
    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    //hadoop的api
    val fileInputStream = fs.open(path, bufferSize)
    val serializer = env.serializer.newInstance()
    val deserializeStream = serializer.deserializeStream(fileInputStream)

    // Register an on-task-completion callback to close the input stream.
    context.addTaskCompletionListener(context => deserializeStream.close())

    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
  }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673435.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号