开发原则: 原则一:避免创建重复的RDD 原则二:尽可能复用同一个RDD 原则1和原则2 都要由原则3来配合使用 原则三:对多次使用的RDD进行持久化持久化的级别,要针对于具体应用场景来选择 原则四:尽量避免使用shuffle类算子 原则五:使用map-side预聚合的shuffle操作 原则六:使用高性能的算子 原则七:广播大变量 原则八:使用Kryo优化序列化性能 原则九:优化数据结构
解决方案一:使用Hive ETL预处理数据 解决方案二:过滤少数导致倾斜的key 解决方案三:提高shuffle操作的并行度 解决方案四:两阶段聚合(局部聚合+全局聚合) 解决方案五:将reduce join转为map join 解决方案六:采样倾斜key并分拆join操作 解决方案七:使用随机前缀和扩容RDD进行join 解决方案八:多种方案组合使用
局部聚合+全局聚合解决数据倾斜问题
package com.qf.sql.day03
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataframe, Row, SparkSession}
object _07TestDataSkew {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]").appName("data skew").getOrCreate()
import spark.implicits._
var rdd1 = spark.sparkContext.makeRDD(List("a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a","b,c,d,e,f","b,b,c,c,d,e,f,g","a,b,a,c,f"))
val df: Dataframe = rdd1.toDF("line")
df.createTempView("temp")
//先正常统计每个字符的个数
val sql =
"""
|select word,count(1)
|from
|(
|select explode(split(line,",")) word
|from temp) t1
|group by t1.word
|
|""".stripMargin
// spark.sql(sql).show()
println("----------------先在单词前面拼接随机数字,比如0,1,2,3-----------------------")
val sql1 =
"""
|select concat(floor(rand()*4),"-",word)
|from
|(
| select explode(split(line,",")) word
|from temp) t1
|
|""".stripMargin
// spark.sql(sql1).show()
println("----------------將加上前缀的单词,进行预聚合-----------------------")
val sql2 =
"""
|
|select prefix_word,count(1)
|from(
|select concat(floor(rand()*4),"-",word) prefix_word
|from
|(
| select explode(split(line,",")) word
|from temp) t1
|) t2
|group by prefix_word
|
|""".stripMargin
spark.sql(sql2).show()
println("----------------去掉前缀,进行全局聚合-----------------------")
val sql3 =
"""
|
|select substr(prefix_word,instr(prefix_word,"-")+1) w,sum(num)
|from
| (select prefix_word,count(1) num
| from(
| select concat(floor(rand()*4),"-",word) prefix_word
| from(
| select explode(split(line,",")) word
| from temp) t1
| ) t2
| group by prefix_word
| ) t3
|group by w
|""".stripMargin
spark.sql(sql3).show()
spark.stop()
}
}



