(图片来源:北风网)
找到RDD的iterator方法
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
//如果持久化级别不为none
//直接去取持久化数据并计算
getOrCompute(split, context)
} else {
//否则计算或者读取checkpoint
computeOrReadCheckpoint(split, context)
}
}
进入computeOrReadCheckpoint
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
//如果rdd被checkpoint和materialized
//调用父RDD的iterator方法,就是从checkpoint外部文件系统中读取数据
firstParent[T].iterator(split, context)
} else {
//没有checkpoint就老老实实计算
compute(split, context)
}
}
private[spark] def isCheckpointedAndMaterialized: Boolean =
checkpointData.exists(_.isCheckpointed)
//获取父RDD
//
protected[spark] def firstParent[U: ClassTag]: RDD[U] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}
读取checkpoint
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}
可以看到,采用了hadoop的api去读取hdfs数据
def readCheckpointFile[T](
path: Path,
broadcastedConf: Broadcast[SerializableConfiguration],
context: TaskContext): Iterator[T] = {
val env = SparkEnv.get
//从hdfs读数据
val fs = path.getFileSystem(broadcastedConf.value.value)
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
//hadoop的api
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => deserializeStream.close())
deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}



