Spark原理第一篇
1 观察执行流程
Explain 查看日志 也可以webUI 界面里面观察
2 资源调优
资源上限的估算 资源内存的估算 资源内存比例调整的估算 例如 sparkSQL 默认 shuffle 并行度是 200,将设数据量为 100 G,那么每个 task 的数据量为 100G/200 = 500 mb。 如果每个 executor core 数量为4,则需要的执行内存为 4*500 = 2G,执行内存占总内存百分之30,则可以算出每个节点需要的总内存为 2 * 10/3 G + 300 mb
3 缓存调优
缓存结合kryo序列化
4 CPU利用
一般推荐并行度 (task数量) 为并发度 ( core 数 * executor 数) 的2-3倍 如果task数量设置过大,可能会导致每个task数据量过小会全部分配到一个executor执行。 其他executor则不执行任务,造成cpu浪费
5 SQL语句优化
谓词下推,尽早过滤 列裁剪,只计算读取需要的列值 常量替换 CBO,自动计算所有物理计划代价然后选择 广播 join,来避免 shuffle SMB join,将数据相同key hash放入同一个bucket,拿到大表按照bucket合并后的多个小表数据再join
6 数据倾斜
1 单表查询倾斜:二次聚合,例如key加随机数 2 join查询倾斜:小表join大表,可以小表用广播join 3 大表join:拆分大key,打散大表,扩容小表 大表大key随机打散,例如大key 1可以随机变成 1-1,2-1,3-1。小表对应的1扩容成1-1,2-1,3-1然后join
Map端优化
map 端预聚合
读取小文件的优化
1 可以启动参数修改 --分区最大大小,默认128mb --文件打开开销。默认4mb,最好设置接近小文件的大小 每计算一个文件,除了文件本身大小,还会加上设置的文件打开开销值。当总和大于分区最大大小,就会写新的分区
2 调大 map 端 shuffleWrite溢写的输出缓冲流 初始内存为5mb,当使用内存不足,则当前使用内存 *2 例如当前使用到6mb,超过了初始分配的5mb,则申请6*2-5=7mb内存 当申请不到内存时,则通过默认32kb的输出流缓冲区进行溢写磁盘 spark.shuffle.file.buffer 输出流缓冲区 (默认32kb) 可以调大提高溢写磁盘效率
Reduce端优化
1 合理设置reduce数量:一般推荐并行度 (task数量) 为并发度 ( core 数 * executor 数) 的2-3倍 2 小文件过多: 可以用coalesce,repartition等方式调整分区 3 增大reduce缓冲区 (默认48mb,提高shuffle拉取数据效率),减少重试次数 (默认3次),重试等待间隔 (默认5秒) 4 合理利用bypass 设置使用bypass分数数阈值,bybass不会预聚合,数量不大时候会提高性能 默认bypass阈值是200
Job优化
调节数据本地化等待时长:executor选择策略的降级等待时间 使用堆外内存:当数据量极大时,可考虑堆外内存。让java gc,缓存交给堆外内存 调节ack等待连接时长:有可能数据太大,一直GC占用内存,导致了executor无内存拉取数据超时
Spark 3.0 AQE
动态合并分区,分区的数值调整 动态切换join,例如自动转换广播join 自动优化join倾斜 动态分区裁剪



