栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark submit——yarn模式参数调优

spark submit——yarn模式参数调优

spark官网上的参数默认值设置:https://spark.apache.org/docs/latest/configuration.html#spark-streaming

此处记录一下最近整理的spark 集群模式提交yarn的部分常用参数设置 (友情提示:以下代码块中注释部分未加注释标# )

spark-submit --master yarn-cluster                    yarn模式
        --name ${APP_NAME}                               appName
        --executor-memory 3G                               每个executor分配的内存 此处可参考上篇 executor内存分配图解
        --executor-cores 3                  每个executor分配的核数   核数*executor数 = task被执行的并发度=输入分片数=输入rdd分区数 但应限制在queue总分配核数以内                   
        --num-executors 12               集群中启动的executor总数
        --driver-memory 2G                 driver运行内存 默认值为1G
        --driver-cores  1                 driver核数 可以不用改

        --conf spark.yarn.executor.memoryOverhead=2048           executor堆外内存,值为 executorMemory * 0.1, 最小384    spark shuffle的底层传输方式是使用netty传输 netty在进行网络传输的过程中会申请堆外内存 


        --conf spark.yarn.driver.memoryOverhead=2048      driver堆外内存

        --conf "spark.shuffle.io.maxRetries=1"     shuffle read任务从shuffle write任务节点上拉取数据(netty)时失败重试次数  默认值为3 
        --conf "spark.reducer.maxSizeInFlight=36m"    用于设置shuffle read task的buffer缓冲大小 如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数
        --conf spark.shuffle.io.connectionTimeout=30     shuffle超时时间


        --conf "spark.ui.port=45041"          spark ui端口号  默认值为4040

         --conf "spark.ui.showConsoleProgress=false"       日志中是否打印出任务执行的进度条


        --conf "spark.streaming.receiver.maxRate=20000"                   限制每个 receiver 每秒最大可以接收的记录的数据 以防止excutor端出现数据积压问题    适用于spark1.5之前使用receiver的方式消费kafka的情况

       --conf "spark.streaming.kafka.maxRatePerPartition=1000"       设定对目标topic每个partition每秒钟拉取的数据条数   kafka0.10版本之后采用此直连的方式消费数据   

       --conf "spark.streaming.backpressure.enabled=true"      Spark 1.5 引入了反压(Back Pressure)机制,其通过动态收集系统的一些数据来调整spark对Kafka的消费速率 ,从而自动地适配集群数据处理能力   反压机制是结合以上两个配置一起使用的   另外还有一个速率估算器的配置  目前只有一种 即默认的基于pid的速率估算器   如果用户配置了 spark.streaming.receiver.maxRate 或 spark.streaming.kafka.maxRatePerPartition,那么最后到底接收多少数据取决于三者的最小值


        --conf "spark.dynamicAllocation.enable=true"         开启动态资源分配功能  根据业务实际需求动态增加或删除executor的数量  当task到来时,spark会根据启动间隔依次启动executor,如果资源充足,则每次按照spark.dynamicAllocation.sustainedSchedulerBacklogTimeout的值启动1,2,4…指数倍个executor,直至资源分配得到满足。如果executor的空闲间隔超过spark.dynamicAllocation.executorIdleTimeout设置的值(默认60s)的话,则该executor会被移除,除非内存里面有缓存数据。   (spark1.2开始)

        --conf "spark.shuffle.service.enabled=true"        表示在nodemanager上开启shuffle功能,只有这两个配置项都开启的时候,动态资源分配功能才算生效。

        --conf "spark.memory.fraction= 0.5"        表示spark Memory占整个usableMemory = systemMemory - Reserved Memory =spark Memory + user Memory的内存  默认值为0.75  spark2.X之后为0.6
        --conf "spark.memory.storageFraction=0.5"         spark1.6之后的配置(目前深圳这边的spark集群版本是1.6) 表示storage Memory占spark Memory(storage Memory+execution Memory)的比例  默认值为0.5

   
        --conf "spark.driver.extraJavaOptions=-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:MaxPermSize=512M     
                -XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps"        打印GC简单及详细信息


        --conf "spark.streaming.stopGracefullyonShutdown= true"       优雅地关闭程序 首先将Reciever关闭,不再接收新数据,然后将已经收下来的数据都处理完,最后再退出底层实现原理为Hadoop的ShutdownHook


        --conf "spark.locality.wait=1s"   spark的数据本地化策略是 数据在哪个节点上,就把Task分配到对应的节点上,这样就避免了不必要的网络传输。如果分配任务时数据所在节点没有足够的资源来执行本次任务,就等待spark.locality.wait秒 默认是3秒 3秒之后如果该节点空闲则省去网络传输   否则就把任务分配到别的节点执行


        --conf "spark.streaming.blockInterval=1000ms"     意思是每间隔多少秒后,Spark才把接收到的数据组成数据块存到Spark中 默认值200ms  假设实例化InputDStream时设置的Duration(batch interval)为1秒(1000ms),那么任务执行时,总共有 1000 / 200 = 5 个block,每个block将对应一个task。如果task的数量少于每台机器配置的core的数量,则说明资源没有被很好的利用。


        --jars ${LOCAL_LIB_PATH}/fastjson-1.2.7.jar,${LOCAL_LIB_PATH}/jedis-2.9.0.jar,${LOCAL_LIB_PATH}/commons-pool2-2.4.2.jar,${LOCAL_LIB_PATH}/commons-pool-1.5.4.jar,${LOCAL_LIB_PATH}/kafka-clients-2.0.0.jar,${LOCAL_LIB_PATH}/kafka_2.11-0.9.0.0.jar,${LOCAL_LIB_PATH}/java-utils-0.1.5.jar,${LOCAL_LIB_PATH}/scala-utils-0.0.3.jar        指定部分依赖包
        --class com.inveno.news.stat.UserStatisticalImpression ${LOCAL_LIB_PATH}/users_active_statistical-0.0.1.jar        指定任务的主类以及jar包
关于输入分片 inputSplit ,task ,executor,cores,rdd partition的关系梳理:

提交spark任务,在进行这些相关参数设置的时候,我们需要知道的关系是:

  1. 原始数据读入阶段,一个输入分片对应一个task,
    对RDD进行计算的时候,一个RDD分区对应一个task,所以rdd的分区数决定了task的总数;
  2. 一个task只会被分配到一个executor;
  3. 一个executor可以配置多个core;
  4. core相当于cpu,因此core数关系到任务的并行度,也就是多个task被同时分配到一个executor上的多个core并发执行,继而得出 parallelism_task = num_executors * executor_cores;

搞清楚这个关系后,我们可以从确定一个变量值开始推导所有相关参数的设置

例如,原始文件数据读入的情况下,如果没有相关参数的设置,形成的输入分片数极有可能是一个大到出乎意料的数值,这个时候,我们通过设置默认并行度,可以将分片数降到一个合适的值(比如合适的partition数或者任务可用总核数),具体的executor数以及core的设置,可以参考使用的yarn-queue的总分配核数,在不影响其他任务运行的情况下酌情设置。或者如果是kafka + sparkStreaming的情况下,则可以根据kafka topic的分区数来确定合适的任务并行度。

说明:事实上本菜鸟目前对以上几个参数调优没有什么经验,不知道到底应该先确定哪个参数,再去推导其他的参数,跪求有大佬路过时能不吝赐教,不胜感激!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/467255.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号