-
避免创建重复的RDD
-
尽可能复用同一个RDD
-
对多次使用的RDD进行持久化
首选缓存策略是memory_only,如果RDD中的数据比较多,直接使用这种持久化级别,容易导致JVM的OOM内存溢出异常
其次再是MEMORY_AND_DISK_SER,SER会进行序列化,对数据进行压缩
-
尽量避免使用shuffle类算子
-
使用map-side预聚合的shuffle操作
-
使用高性能的算子
使用reducerByKey代替groupByKey,reducerByKey会在map端提前进行预聚合(适用于幂等计算)
使用mapPartitions代替map,转换算子,按分区来处理数据,当需要与外部建立连接时,使用mapPartitions可以减少连接次数,提高效率
使用foreachPartitions代替foreach
使用filter之后使用coalesce,过滤之后数据量减小,使用coalesce减少分区,可以不产生shuffle
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
repartition:coalesce(numPartitions,true) 增多分区使用这个
-
广播大变量
当在算子内部使用外部变量时,可以提高广播大变量的方式来减少网络传输的开销,并减少executor内存占比开销
广播变量的发送方式:executor一开始并没有广播变量,当task执行时需要用到广播变量,就会找blockmanager要,blockmanager会找Driver端的blockManagerMaster要
-
使用Kryo优化序列化性能
spark默认使用的是java序列化机制,但是Kryo序列化后的数据更小,更快
-
优化数据结构
在Spark编码实现中,特别是对于算子函数中的代码,尽 量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如 Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用 ,从而降低GC频率,提升性能。
-
使用高性能的库fastutil
在Spark Application提交后,Driver会根据action算子划分成一个个的job,然后对每一 个job划分成一个个的stage,stage内部实际上是由一系列并行计算的task组成的,然后 以TaskSet的形式提交给你TaskScheduler,TaskScheduler在进行分配之前都会计算出 每一个task最优计算位置。Spark的task的分配算法优先将task发布到数据所在的节点上 ,从而达到数据最优计算位置。
调等待时间的参数
num-executors
executor-memory
该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。每个Executor进程的内存设置4G8G较为合适。申请的总内存量最好不要超过资源队列最大总内存的1/31/2,避免你自己的Spark作业占用了队列所有的资源。
executor-cores
该参数用于设置每个Executor进程的CPU core数量。Executor的CPU core数量设置为2~4个较为合适。num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适。
driver-memory
该参数用于设置Driver进程的内存。如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。
spark.default.parallelism
该参数用于设置每个stage的默认task数量。设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的。
spark.storage.memoryFraction
该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。
spark.shuffle.memoryFraction
该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。
数据倾斜优化 使用Hive ETL预处理数据通过Hive来进行数据预处理,spark作业中就不需要使用shuffle类算子了,把数据倾斜的发生提前到了hive etl中
过滤少数导致倾斜的key适用场景及原理:导致倾斜的key就少数几个,如果那少数几个数据量特别多的key,对作业的执行和计算结果不是特别重要的话,那么干脆就直接过滤掉那少数几个key,自然不可能产生数据倾斜。
实现思路:可以使用sample算子对RDD进行采样,然后 计算出每个key的数量,取数据量最多的key过滤掉即可
提高shuffle操作的并行度原理:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个 task,从而让每个task处理比原来更少的数据
实现:在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如 reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于 Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。
双重聚合原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被 一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。 接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果
实现:进行两阶段聚合,第一次是局部聚合,先给每个key 都打上一个随机数,接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,然后将各个key的前缀给去掉,再次进行全局聚合操作,就可以得到最终结果了
将reduce join转为map join适用场景:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中 的一个RDD或表的数据量比较小
普通的join操作会触发shuffle过程,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。如果一个RDD是比较小的, 则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜
采样倾斜key并分拆join操作适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大。如果出现数据倾斜,是因为其中某一 个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均 匀,那么采用这个解决方案是比较合适的。



