org.apache.spark.memory.UnifiedMemoryManager#getMaxMemory
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.get(TEST_MEMORY)
val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains(config.EXECUTOR_MEMORY)) {
val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.get(config.MEMORY_FRACTION)
(usableMemory * memoryFraction).toLong
} |
在这个方法中,规定了execution和storage能够获得的最大内存.
val usableMemory = systemMemory - reservedMemory val memoryFraction = conf.get(config.MEMORY_FRACTION) (usableMemory * memoryFraction).toLong |
其中涉及系统保留内存,系统内存,内存比例三个变量
系统保留内存为固定300m
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024 |
系统总内存由Runtime.getRuntime.maxMemory获取
val TEST_MEMORY = ConfigBuilder("spark.testing.memory")
.version("1.6.0")
.longConf
.createWithDefault(Runtime.getRuntime.maxMemory) |
memory fraction默认为0.6
private[spark] val MEMORY_FRACTION = ConfigBuilder("spark.memory.fraction")
.doc("Fraction of (heap space - 300MB) used for execution and storage. The " +
"lower this is, the more frequently spills and cached data eviction occur. " +
"The purpose of this config is to set aside memory for internal metadata, " +
"user data structures, and imprecise size estimation in the case of sparse, " +
"unusually large records. Leaving this at the default value is recommended. ")
.version("1.6.0")
.doubleConf
.createWithDefault(0.6) |
在org.apache.spark.deploy.yarn.YarnAllocator 中介绍了executor的内存情况
// Resource capability requested for each executor
private[yarn] val resource: Resource = {
val resource: Resource = Resource.newInstance(
executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
logDebug(s"Created resource capability: $resource")
resource
} |
executor内存由 executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory四部分组成
这四部分来自
// Executor memory in MiB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) {
sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
} else {
0
} |
对应的几个参数分别为
executor内存
private[spark] val EXECUTOR_MEMORY = ConfigBuilder(SparkLauncher.EXECUTOR_MEMORY)
.doc("Amount of memory to use per executor process, in MiB unless otherwise specified.")
.version("0.7.0")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g") |
executor堆外内存,先判定是否开启,再检查设置的对外内存是否大于0
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
require(sizeInMB > 0,
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
sizeInMB
} else {
0
}
} |
private[spark] val EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.executor.memoryOverhead")
.doc("The amount of non-heap memory to be allocated per executor, in MiB unless otherwise" +
" specified.")
.version("2.3.0")
.bytesConf(ByteUnit.MiB)
.createOptional |
第四个python的不考虑
在本地启动spark-shell验证设置的内存是否符合以上规定设置driver内存为2g
spark-shell --conf spark.driver.memory=2g |
获取对应变量
//获取driver内存,在本地模式driver充当一个executor
scala> sc.getConf.getSizeAsBytes("spark.driver.memory")
res0: Long = 2147483648
//获取系统内存
scala> val systemMemory = Runtime.getRuntime.maxMemory
systemMemory: Long = 1908932608
//固定系统保留内存300m
scala> val reservedMemory = 300 * 1024 * 1024
reservedMemory: Int = 314572800
//内存比例默认0.6
scala> val memoryFraction = sc.getConf.getDouble("spark.memory.fraction", 0.6)
memoryFraction: Double = 0.6
//系统内存减去保留内存
scala> val usableMemory = systemMemory - reservedMemory
usableMemory: Long = 1594359808
//根据公式获取最大内存
scala> val maxMemory = (usableMemory * memoryFraction).toLong
maxMemory: Long = 956615884
//将最大内存转换为M
scala> 956615884/1024/1024d
res6: Double = 912.2998046875 |
在spark管理页面看到storage memory确实为912.3m,符合计算结果
参考文档 apache spark - How does web UI calculate Storage Memory (in Executors tab)? - Stack Overflow
同时,发现 Runtime.getRuntime.maxMemory 获得的内存大小小于设置的2g
scala> 1908932608/1024/1024d res3: Double = 1820.5 |
这是因为在虚拟机中分为老年代和年轻代,而在年轻代中有一个Eden区和两个Survivor区,而同时只有一个survivor是可用的,因此造成可用内存大小小于总内存大小
再启动一个spark-submit任务,设置打印gc日志
spark-submit --class com.getui.bigdata.spark_study.ReadData
--master yarn
--deploy-mode cluster
--driver-memory 1g
--executor-memory 2g
--executor-cores 1
--conf "spark.executor.extraJavaOptions=-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC"
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=/Users/mac/IdeaProjects/spark/conf/log4j.properties"
/Users/mac/IdeaProjects/namespace/spark/target/spark-0.1-independency.jar |
{Heap before GC invocations=6 (full 1):
PSYoungGen total 105984K, used 105830K [0x0000000795580000, 0x000000079e500000, 0x00000007c0000000)
eden space 100864K, 100% used [0x0000000795580000,0x000000079b800000,0x000000079b800000)
from space 5120K, 96% used [0x000000079e000000,0x000000079e4d98f0,0x000000079e500000)
to space 10752K, 0% used [0x000000079d000000,0x000000079d000000,0x000000079da80000)
ParOldGen total 100864K, used 6714K [0x0000000740000000, 0x0000000746280000, 0x0000000795580000)
object space 100864K, 6% used [0x0000000740000000,0x000000074068ea60,0x0000000746280000)
metaspace used 32269K, capacity 33870K, committed 34176K, reserved 1079296K
class space used 4426K, capacity 4625K, committed 4736K, reserved 1048576K
10.143: [GC (Allocation Failure) [PSYoungGen: 105830K->7867K(136192K)] 112544K->14590K(237056K), 0.0078949 secs] [Times: user=0.02 sys=0.00, real=0.01 secs]
Heap after GC invocations=6 (full 1):
PSYoungGen total 136192K, used 7867K [0x0000000795580000, 0x000000079fb80000, 0x00000007c0000000)
eden space 125440K, 0% used [0x0000000795580000,0x0000000795580000,0x000000079d000000)
from space 10752K, 73% used [0x000000079d000000,0x000000079d7aeef8,0x000000079da80000)
to space 10752K, 0% used [0x000000079f100000,0x000000079f100000,0x000000079fb80000)
ParOldGen total 100864K, used 6722K [0x0000000740000000, 0x0000000746280000, 0x0000000795580000)
object space 100864K, 6% used [0x0000000740000000,0x0000000740690a60,0x0000000746280000)
metaspace used 32269K, capacity 33870K, committed 34176K, reserved 1079296K
class space used 4426K, capacity 4625K, committed 4736K, reserved 1048576K
} |
7.157: [Full GC (metadata GC Threshold) [PSYoungGen: 6495K->0K(240640K)] [ParOldGen: 12872K->13889K(184320K)] 19368K->13889K(424960K), [metaspace: 54854K->54854K(1101824K)], 0.0745744 secs] [Times: user=0.37 sys=0.01, real=0.08 secs] |
年轻代加老年代即为堆大小 240640K + 184320K = 424960K
参考文档 Spark1.6内存管理(二) 实例讲解:Spark管理页面中Storage Memory是如何计算的?_wisgood的专栏-CSDN博客_spark 管理界面
JVM的内存划分及metaSpace VS PermGen - 简书



