栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark分析(七)Spark Streaming运行流程详解(2)

Spark分析(七)Spark Streaming运行流程详解(2)

2021SC@SDUSC 前言

上一篇博客分析了StreamingContext的部分代码,本篇将继续分析DStream.print
下图为DStream的输出操作print的流程

图1

先来看看DStream.print是如何把ForeachDStream对象注册到DStreamGraph中的。

// DStream.print

 
def print():Unit = ssc.withScope {
	print(10)
}

其中的print调用了另一个重载的print

// DStream.rpint


def print(num: Int): Unit = ssc.withScope {
	def foreachFunc: (RDD[T], Time) => Unit = {
		(rdd: RDD[T], time: Time) => {
			val firstNum = rdd.take(num + 1)
			// scalastyle:off println
			println("-----------------------")
			println("Time: " + time)
			println("-----------------------")
			firstNum.take(num).foreach(println)
			if (firstNum.length > num) println("...")
			println()
			// scalastyle:on println

		}
	}
	foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)

}

先定义用于打印输出的函数foreachFunc,该函数没有被执行。然后调用foreachRDD,foreachRDD的参数中有函数foreachFunc。

// DStream.foreachRDD


private def foreachRDD(
						foreachFunc: (RDD[T], Time( => Unit,
						displayInnerRDDOps: Boolean): Unit = {
		new ForEachDStream(this,
			context.sparkContext.clean(foreachFunc. false), displayInnerRDDOps).register()
	}
	

有新创建的ForeachDStream对象,DStream输出操作中定义的函数被封装在其中,仍没有被执行。这个ForeachDStream对象还调用了register方法。

// DStream.register


private[streaming] def register(): DStream[T] = {
	ssc.graph.addOutputStream(this)
	this

}

def addOutputStream(outputStream: DStream[_]) {
	this.synchronized {
		outputStream.setGraph(this)
		outputStreams += outputStream
	}

}

因此,在注册时,会将当前的ForeachDStream对象加入到graph的outputStreams中。
Spark应用程序一般都不是简单的本地程序,而是要在分布式环境下运行的。StreamingContext启动时,需要在分布式集群环境中做好各项准备工作,然后开始数据接收,并且在每个批处理时间把业务处理部分分解成若干个Job,提交到Spark集群中的各个Executor上执行。

StreamingContext启动流程

下面稍微细化以下StreamingContext启动过程。
首先是数据接收和分配。业务处理部分定义了InputDStream子类,其中一般还会定义具体的流数据接收器。在Executor上用Receiver接收流数据,然后缓存下来,累计成块(Block),再分配给相应流处理时间间隔内的Job。在Driver端需要有个跟踪器(Tracker),这个跟踪器不仅督促远端的Executor启动Receiver,还要管理待分配给Job的数据Block的原数据。这个跟踪器叫ReceiverTracker。
其次是定期不断生成数据处理Job。像NetworkWordCount这样的程序中定义了初始的InputDStream及其后续的操作,需要转化为相应的RDD,然后根据流处理时间间隔不断生成Job,并提交到Executor上执行。这项工作需要在Driver上有个Job的生成器(Generator)来负责,这个生成器叫JobGenerator。

从StreamingContext的启动开始分析

// StreamingContext.start

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/584437.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号