栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark分析(十)Spark Streaming运行流程详解(5)

Spark分析(十)Spark Streaming运行流程详解(5)

2021SC@SDUSC 前言

至上一篇博客分析完了Spark Streaming的数据接收初步流程,接下来分析Spark Streaming的数据清理

Spark Streaming数据清理

Spark Streaming应用是持续不断地运行着的。如果不对内存资源进行有效管理,内存就有可能很快就耗尽。Spark Streaming应用有自己的对象、数据、元数据的清理机制。
Spark Streaming应用中的对象、数据、元数据是操作DStream时产生的。
先给出数据清理的总流程图:

图1

前面还有一部分叫做JobGenerator的job.run:

// JobScheduler.JobHandler.run

def run() {
	try {
		...
		var _eventLoop = eventLoop
	if (_eventLoop != null) {
		_eventLoop.post(JobStarted(job, clock.getTimeMillis()))
		PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
			job.run()
		}
		_eventLoop = eventLoop
		if (_eventLoop != null) {
		_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
		}
	} else {
		// JobScheduler has been stopped
	}
} finally {
	ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
	ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
	}

}
	

run发送了JobCompleted消息。JobScheduler.processEvent定义了针对消息的处理:

// JobScheduler.processEvent

private def processEvent(event: JobSchedulerEvent) {
	try {
		event match{
			case JobStarted(job, startTime) => hndleJobStart(job, startTime)
			case JobCompleted(job, completedTime) => handleJobCopletion(job, completedTime)
			case ErrorReported(m, e) => handleError(m, e)
		}
	} catch {
		case e: Throwable =>
			reportError("Error in job scheduler", e)
	}

}

对JobCompleted事件的处理是调用了handleJobCompletion。

// JobScheduler.handleJobCompletion

private def handleJobCompletion(job: Job, completedTime: Long) {
	val jobSet = jobSets.get(job.time)
	jobSet.handleJobCompletion(job)
	job.setEndTime(completedTime)
	listenerBus.post(StreamingListenerOutputOperationCompleted(job.toOutputOperationInfo))
	logInfo("Finished job "+ job.id + "from job set of tme "+ jobSet.time)
	if (jobSet.hasCompleted) {
		jobSets.remove(jobSet.time)
		jobGenerator.onBatchCompletion(jobSet.time)
		logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
			jobSet.totalDelay / 1000.0, jobSet.time.toString,
			jobSet.processingDelay / 1000.0
			))
			listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo)
		}
		job.result match {
			case Failure(e) =>
				reportError("Error running job " + job, e)
			case _ =>

		}

}

清理JobSets中已提交执行的JobSet,还调用了jobGenerator.onBatchCompletion

// JobGenerator.onBatchCompletion



def onBatchCompletion(time: Time) {
	eventLoop.post(Clearmetadata(time))
}

发送了Clearmetadata消息。下面查看以下JobGenerator.start中eventLoop的定义:

// JobGenerator.start片段

eventLoop = new EventLoop[JobGeneratorEvent] )"JobGenerator") {
	override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)



	override protected def onError(e: Throwable): Unit = {
		jobScheduler.reportError("Error in job generator", e)
		
	}

}

由此可知,消息是由eventLoop.onReceive指定的JobGenerator.processEvent做处理:

// JobGenerator.processEvent



private def processEvent(event: JobGeneratorEvent) {
	logDebug("Got event " + event)
	event match {
		case GenerateJobs(time) => generateJobs(time)
		case Clearmetadata(time) => clearmetadata(time)
		case DoCheckpoint(time, clearCheckpointDataLater) =>
			doCheckpoint(time, clearCheckpointDataLater)
		case ClearCheckpointData(time) => clearCheckpointData(time)
	
	}

}

其中针对清理元数据(Clearmetadata)消息的处理是clearmetadata

// JobGenerator.clearmetadata



private def clearmetadata(time: Time) {
	ssc.graph.clearmetadata(time)



	// If checkpoint is enabled, then checkpoint,
	// else mark batch to be fully processed
	if (shouldCheckpoint) {
		eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = true))
	} else {
		// If checkpointing is not enabled, the delete meatdata information about
		// received blocks (block data not saved in any case). Otherwise, wait for
		// checkpointing of this batch to complete.
		val maxRememberDuration = graph.getMaxInputStreamRememberDuration()
		jobScheduler.receiverTracker.cleanupOldBlocksAndBatched(time - maxRemeberDuration)
		jobScheduler.inputInfoTracker.cleanup(time - maxRemeberDuration)
		markBatchFullyProcessed(time)
	}

}

可以看到有多项清理工作。而receiverTracker和inputInfoTracker的清理工作有前提条件:不需要设置检查点 。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/654507.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号