栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark任务参数估算配置

spark任务参数估算配置

1.总体原则 1.Yarn配置

Yarn部署在单台服务器128G内存,32个核

如果一个服务器是5个核, executor-cores=5,则理论上num-executor<=6个核。5*6<=32

/hadoop-xxx/yarn-site.xml
修改yarn的运行分配的最大最小内存

yarn.scheduler.minimum-allocation-mb 
yarn.scheduler.maximum-allocation-mb
2.配置参数

    driver-memory

    driver端的内存消耗主要是以下内容:

    1.创建小规模的分布式数据集:使用 parallelize、createDataframe 等 API 创建数据集

    2.收集计算结果:通过 take、show、collect 等算子把结果收集到 Driver 端

    根据以上估算即可

    下面代码是预估driver端内存的执行计划

    val df: Dataframe = _
    df.cache.count
    val plan = df.queryExecution.logical
    val estimated: BigInt = spark
    .sessionState
    .executePlan(plan)
    .optimizedPlan
    .stats
    .sizeInBytes
    

    executor-cores

    1.每个executor的最大核数,一般在3-6之间比较合适.

    2.Executor 中并行计算任务数的上限是 spark.executor.cores 除以 spark.task.cpus ,暂且记为 #Executor-tasks,整个集群的并行计算任务数自然就是 #Executor-tasks 乘以集群内 Executors 的数量

    3.每个线程只能在同一时刻处理一个任务在一个分区上,因此,在运行时,线程、任务与分区是一一对应的关系。

    配置项 | 含义
    ---|---
    spark.cores.max| 集群范围内最大的CPU核数
    spark.executore.cores | 单个Executor内CPU核数
    spark.task.cpus | 单个任务消耗的CPU核数,默认为1
    spark.default.parallelism|未指定分区数是的默认并行度
    spark.sql.shuffle.partitions|数据关联、聚合操作中Reducer的并行度
    

    –num-executors
    一个job拥有多个少jvm进程。每个executor和kafka的一个parition对接

    executor-memory

    单个executor的内存

      如果yarn 内存配置不足,例如只有5个executor,100/5=20g则最大为20g
3.spark内存估算 1. spark内存模型如下图

内存区域划分堆内内存堆外内存
内存空间总大小spark.executor.memoryspark.memory.offHeap.size
Reserved Memory固定为300M
User Memory1-spark.memory.fraction
Storage Meomoryspark.memory.fraction*spark.memory.storageFracionspark.memory.storageFraction
Reserved Memoryfraction*(1-strageFraction)1-storeageFraction
2. 不同内存区域解释

Reserved Memory 用于存储 Spark 内部对象,300MUser Memory 用于存储用户自定义的数据结构(图中黄色区域)Execution Memory 用于分布式任务执行Storage Memory 则用来容纳 RDD 缓存和广播变量 3.内存争抢协议

如果对方的内存空间有空闲,双方就都可以抢占;对于 RDD 缓存任务抢占的执行内存,当执行任务有内存需要时,RDD 缓存任务必须立即归还抢占的内存,涉及的 RDD 缓存数据要么落盘、要么清除;对于分布式计算任务抢占的 Storage Memory 内存空间,即便 RDD 缓存任务有收回内存的需要,也要等到任务执行完毕才能释放。 4.内存估算方法

针对单个executor而言

设定spark.executor.memory为900M

spark.storage.storageFtaction=0.3

spark.memory.fraction=0.6

1.执行内存总大小(即图中蓝色+绿色部分)

执行内存总大小(M)=(spark.executor.memory-System.reserved.memory)* spark.memory.fraction=(900M—300M)*0.6=360 M

存储内存(Story Memory)=(spark.executor.memory-System.reserved.memory)*spark.memory.fraction* spark.storage.storageFtaction=(900M—300M)*0.6*0.3=108

执行内存(Execution Memory)=(spark.executor.memory-System.reserved.memory)*spark.memory.fraction* spark.storage.storageFtaction=(900M—300M)*0.6*0.7=252

2. 其他(图中黄色部分)

other=(spark.executor.memory-System.reserved.memory)*(1-spark.memory.fraction)=240 M

5. 简单内存估算方法:

Storage Memory估算:将要缓存到内存的RDD/Dataset/Dataframe或广播变量进行cache,然后在Spark WEBUI的Storage标签页下直接查看所有的内存占用,大致就对应Storage Memory。

Execution Memory估算:有了Storage Memory,因为默认情况下Execution Memory和Storage Memory占用Spark Memory的比例是相同的,这里可以将Execution Memory和Storage Memory设置为相同。

User Memory:如果应用中没有使用太多自定义数据类型,保持默认值即可;如果使用了很多自定义数据类型,按老师说的方式进行估算即可。

6. 堆外内存存在的意义

在于堆外堆内的空间互不share,也就是说,你的task最开始用堆外,用着用着发现不够了,这个时候即使堆内还有空闲,task也没法用,所以照样会oom 7.估计内存的原则

    执行内存总量为M ,Execution Memory+Storage Memory的剩余空间,最少为Execution Memory,最多为Storage Memory + Execution Memory线程数为N, spark.execution.cores 为N每个线程可以分配到的内存上下限,下限为M/N/2,上限为M/N,取到2是因为比例是默认比例spark.storage.storageFtaction为0.5

给定数据总量D,以及并行度P,P由spark.default.parallelism来控制。
主要是让数据分片的大小D/P坐落在(M/N/2,M/N)

8.CPU利用率观察

ganglia、Prometheus、Grafana、nmon、htop等等,这些工具,去观察你的CPU利用率,然后回过头来,平衡三者,然后再去观察CPU利用率是不是提升了,通过这种方式,来去调优

9. 解压文件

spark在加载parquet+snappy压缩文件时,它会考虑解压之后的文件大小吗?

Spark考虑的,不是数据在磁盘中的存储大小,是内存中的存储大小,所以恰恰是Parquet+Snappy解压后、在内存中的存储大小~

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/752652.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号