栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spark分析(十一)Spark Streaming运行流程详解(6)

Spark分析(十一)Spark Streaming运行流程详解(6)

2021SC@SDUSC 前言

上一篇博客讲到了receiverTracker和inputInfoTracker,接下来继续讲解

DStreamGraph

先看ssc.graph.clearmetadata(time)代码

// DStreamGraph.clearmetadata

def clearmetadata(time: Time) {
	logDebug("Clearing metadata for time " + time)
	this.synchronized {
		outputStreams.foreach(_.clearmetadata(time))
	}
	logDebug("Cleared old metadata for time " + time)

}

其中清理了所有OutputDStream的一些相关元数据。
DStream.clearmetadata会清理掉当前DStream的rememberDuration之前的元数据。DStream的子类会覆写此方法。rememberDuration是DStream的成员。
spark.streaming.unpersist(默认是true)的配置可以用来设置是否自动非持久化。这可以显著的减少Spark在RDD上的内存使用,同时也可以改善GC行为。
rememberDuration在启动DStreamGraph时被设置,可参考DStream.initialize,这里不做更细的分析了。源码中的设置大体是sildeDuration,如果设置了checkpointDuration则是2*checkpointDuration.还可以通过DStreamGraph.rememberDuration设置:如果想自行设置,可以在应用程序中使用StreamingContext.remember方法,不过自行设置的值要大于内部计算得到的值时才会生效。
另外,后面的DStream会调整前面的DStream的rememberDuration,例如,如果用了窗口操作,会有跨若干Batch Duration的情况,则在此之前的DStream的rememberDuration都需要加上windowDuration。
最后把依赖的RDD也清理掉了。这是递归调用。

下面分析其中调用的BlockRDD.removeBlocks:

// BlockRDD.removeBlocks

private[spark] def removeBlocks() {
	blockIds.foreach { blockId =>
		sparkContext.env.blockManager.master.removeBlock(blockId)
	}
	 
	_isValid = false

}

利用BlockManagerMaster删除当前RDD相关的所有Block。
如果需要设置检查点,则发送DoCheckpoint消息。
DoCheckpoint消息的发送则暂时不做分析,下面分析不需要设置检查单的情况:

先看jobScheduler.receiverTracker.cleanupOldBlocksAndBatches。

// ReceiverTracker.cleanupOldBlocksAndBatches



def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) {
	// Clean up old block and batch metadata
	receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)


	// Signal the receivers to delete old block data
	if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
		logInfo (s"Cleanup old received batch data: $cleanupThreshTime")
		synchronized {
			if (isTrackerStarted) {
				endpoint.send(CleanupOldBlocks(cleanpuThreshTime))
				}
				
			}
			
		}
		

ReceiverTracker.cleanupOldBlocksAndBatches调用了ReceivedBlockTracker.cleanupOldBatches:

// ReceivedBlockTracker.cleanupOldBatches

def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): Unit = synchronized {
	require(cleanupThreshTime.miliseconds < clock.getTimeMolis())
	val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < cleanupThreshTime }.toSeq
	logInfo("Deleting batches " + timesToCleanup)
	if (writeToLog(BatchCleanupEvent(timesToCleanup))) {
		timeToAllocatedBlocks --= timesToCleanup
		writeAheadLogOption.foreach(_.clean(cleanupThreshTime.milliseconds, waitForCompletion))
	} else {
		logWarning("Failed to acknowledge batch clean up in the Write Ahead Log.")
	}

}

cleanupOldBatches清理了旧Batch的Block元数据。
最后回到JobGenerator.clearmetadata,看一看jobScheduler.inputInfoTracker.cleanup:

// inputInfoTracker.cleanup
def cleanup(batchThreshTime: Time): Unit = synchronized {
	val timesToCleanup = batchTimeToInputInfos.keys.filter(_ < batchThreshTime)
	logInfo)s"remove old batch metadata: ${timesToCleanup.mkString("")}")
	batchTimeToInputInfos --= timesToCleanup

}

cleanup清理了旧Batch的跟踪输入源的元数据信息。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/650878.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号