上一篇博客分析了StreamingContext的部分代码,本篇将继续分析DStream.print
下图为DStream的输出操作print的流程
先来看看DStream.print是如何把ForeachDStream对象注册到DStreamGraph中的。
// DStream.print
def print():Unit = ssc.withScope {
print(10)
}
其中的print调用了另一个重载的print
// DStream.rpint
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-----------------------")
println("Time: " + time)
println("-----------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
先定义用于打印输出的函数foreachFunc,该函数没有被执行。然后调用foreachRDD,foreachRDD的参数中有函数foreachFunc。
// DStream.foreachRDD
private def foreachRDD(
foreachFunc: (RDD[T], Time( => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc. false), displayInnerRDDOps).register()
}
有新创建的ForeachDStream对象,DStream输出操作中定义的函数被封装在其中,仍没有被执行。这个ForeachDStream对象还调用了register方法。
// DStream.register
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
def addOutputStream(outputStream: DStream[_]) {
this.synchronized {
outputStream.setGraph(this)
outputStreams += outputStream
}
}
因此,在注册时,会将当前的ForeachDStream对象加入到graph的outputStreams中。
Spark应用程序一般都不是简单的本地程序,而是要在分布式环境下运行的。StreamingContext启动时,需要在分布式集群环境中做好各项准备工作,然后开始数据接收,并且在每个批处理时间把业务处理部分分解成若干个Job,提交到Spark集群中的各个Executor上执行。
下面稍微细化以下StreamingContext启动过程。
首先是数据接收和分配。业务处理部分定义了InputDStream子类,其中一般还会定义具体的流数据接收器。在Executor上用Receiver接收流数据,然后缓存下来,累计成块(Block),再分配给相应流处理时间间隔内的Job。在Driver端需要有个跟踪器(Tracker),这个跟踪器不仅督促远端的Executor启动Receiver,还要管理待分配给Job的数据Block的原数据。这个跟踪器叫ReceiverTracker。
其次是定期不断生成数据处理Job。像NetworkWordCount这样的程序中定义了初始的InputDStream及其后续的操作,需要转化为相应的RDD,然后根据流处理时间间隔不断生成Job,并提交到Executor上执行。这项工作需要在Driver上有个Job的生成器(Generator)来负责,这个生成器叫JobGenerator。
从StreamingContext的启动开始分析
// StreamingContext.start



