spark官网上的参数默认值设置:https://spark.apache.org/docs/latest/configuration.html#spark-streaming
此处记录一下最近整理的spark 集群模式提交yarn的部分常用参数设置 (友情提示:以下代码块中注释部分未加注释标# )
spark-submit --master yarn-cluster yarn模式
--name ${APP_NAME} appName
--executor-memory 3G 每个executor分配的内存 此处可参考上篇 executor内存分配图解
--executor-cores 3 每个executor分配的核数 核数*executor数 = task被执行的并发度=输入分片数=输入rdd分区数 但应限制在queue总分配核数以内
--num-executors 12 集群中启动的executor总数
--driver-memory 2G driver运行内存 默认值为1G
--driver-cores 1 driver核数 可以不用改
--conf spark.yarn.executor.memoryOverhead=2048 executor堆外内存,值为 executorMemory * 0.1, 最小384 spark shuffle的底层传输方式是使用netty传输 netty在进行网络传输的过程中会申请堆外内存
--conf spark.yarn.driver.memoryOverhead=2048 driver堆外内存
--conf "spark.shuffle.io.maxRetries=1" shuffle read任务从shuffle write任务节点上拉取数据(netty)时失败重试次数 默认值为3
--conf "spark.reducer.maxSizeInFlight=36m" 用于设置shuffle read task的buffer缓冲大小 如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数
--conf spark.shuffle.io.connectionTimeout=30 shuffle超时时间
--conf "spark.ui.port=45041" spark ui端口号 默认值为4040
--conf "spark.ui.showConsoleProgress=false" 日志中是否打印出任务执行的进度条
--conf "spark.streaming.receiver.maxRate=20000" 限制每个 receiver 每秒最大可以接收的记录的数据 以防止excutor端出现数据积压问题 适用于spark1.5之前使用receiver的方式消费kafka的情况
--conf "spark.streaming.kafka.maxRatePerPartition=1000" 设定对目标topic每个partition每秒钟拉取的数据条数 kafka0.10版本之后采用此直连的方式消费数据
--conf "spark.streaming.backpressure.enabled=true" Spark 1.5 引入了反压(Back Pressure)机制,其通过动态收集系统的一些数据来调整spark对Kafka的消费速率 ,从而自动地适配集群数据处理能力 反压机制是结合以上两个配置一起使用的 另外还有一个速率估算器的配置 目前只有一种 即默认的基于pid的速率估算器 如果用户配置了 spark.streaming.receiver.maxRate 或 spark.streaming.kafka.maxRatePerPartition,那么最后到底接收多少数据取决于三者的最小值
--conf "spark.dynamicAllocation.enable=true" 开启动态资源分配功能 根据业务实际需求动态增加或删除executor的数量 当task到来时,spark会根据启动间隔依次启动executor,如果资源充足,则每次按照spark.dynamicAllocation.sustainedSchedulerBacklogTimeout的值启动1,2,4…指数倍个executor,直至资源分配得到满足。如果executor的空闲间隔超过spark.dynamicAllocation.executorIdleTimeout设置的值(默认60s)的话,则该executor会被移除,除非内存里面有缓存数据。 (spark1.2开始)
--conf "spark.shuffle.service.enabled=true" 表示在nodemanager上开启shuffle功能,只有这两个配置项都开启的时候,动态资源分配功能才算生效。
--conf "spark.memory.fraction= 0.5" 表示spark Memory占整个usableMemory = systemMemory - Reserved Memory =spark Memory + user Memory的内存 默认值为0.75 spark2.X之后为0.6
--conf "spark.memory.storageFraction=0.5" spark1.6之后的配置(目前深圳这边的spark集群版本是1.6) 表示storage Memory占spark Memory(storage Memory+execution Memory)的比例 默认值为0.5
--conf "spark.driver.extraJavaOptions=-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:MaxPermSize=512M
-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps" 打印GC简单及详细信息
--conf "spark.streaming.stopGracefullyonShutdown= true" 优雅地关闭程序 首先将Reciever关闭,不再接收新数据,然后将已经收下来的数据都处理完,最后再退出底层实现原理为Hadoop的ShutdownHook
--conf "spark.locality.wait=1s" spark的数据本地化策略是 数据在哪个节点上,就把Task分配到对应的节点上,这样就避免了不必要的网络传输。如果分配任务时数据所在节点没有足够的资源来执行本次任务,就等待spark.locality.wait秒 默认是3秒 3秒之后如果该节点空闲则省去网络传输 否则就把任务分配到别的节点执行
--conf "spark.streaming.blockInterval=1000ms" 意思是每间隔多少秒后,Spark才把接收到的数据组成数据块存到Spark中 默认值200ms 假设实例化InputDStream时设置的Duration(batch interval)为1秒(1000ms),那么任务执行时,总共有 1000 / 200 = 5 个block,每个block将对应一个task。如果task的数量少于每台机器配置的core的数量,则说明资源没有被很好的利用。
--jars ${LOCAL_LIB_PATH}/fastjson-1.2.7.jar,${LOCAL_LIB_PATH}/jedis-2.9.0.jar,${LOCAL_LIB_PATH}/commons-pool2-2.4.2.jar,${LOCAL_LIB_PATH}/commons-pool-1.5.4.jar,${LOCAL_LIB_PATH}/kafka-clients-2.0.0.jar,${LOCAL_LIB_PATH}/kafka_2.11-0.9.0.0.jar,${LOCAL_LIB_PATH}/java-utils-0.1.5.jar,${LOCAL_LIB_PATH}/scala-utils-0.0.3.jar 指定部分依赖包
--class com.inveno.news.stat.UserStatisticalImpression ${LOCAL_LIB_PATH}/users_active_statistical-0.0.1.jar 指定任务的主类以及jar包
关于输入分片 inputSplit ,task ,executor,cores,rdd partition的关系梳理:
提交spark任务,在进行这些相关参数设置的时候,我们需要知道的关系是:
- 原始数据读入阶段,一个输入分片对应一个task,
对RDD进行计算的时候,一个RDD分区对应一个task,所以rdd的分区数决定了task的总数; - 一个task只会被分配到一个executor;
- 一个executor可以配置多个core;
- core相当于cpu,因此core数关系到任务的并行度,也就是多个task被同时分配到一个executor上的多个core并发执行,继而得出 parallelism_task = num_executors * executor_cores;
搞清楚这个关系后,我们可以从确定一个变量值开始推导所有相关参数的设置
例如,原始文件数据读入的情况下,如果没有相关参数的设置,形成的输入分片数极有可能是一个大到出乎意料的数值,这个时候,我们通过设置默认并行度,可以将分片数降到一个合适的值(比如合适的partition数或者任务可用总核数),具体的executor数以及core的设置,可以参考使用的yarn-queue的总分配核数,在不影响其他任务运行的情况下酌情设置。或者如果是kafka + sparkStreaming的情况下,则可以根据kafka topic的分区数来确定合适的任务并行度。
说明:事实上本菜鸟目前对以上几个参数调优没有什么经验,不知道到底应该先确定哪个参数,再去推导其他的参数,跪求有大佬路过时能不吝赐教,不胜感激!!



