栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021-10-27

2021-10-27

Spark数据倾斜 局部聚合+全局聚合解决数据倾斜问题
开发原则:
原则一:避免创建重复的RDD       
原则二:尽可能复用同一个RDD       原则1和原则2 都要由原则3来配合使用
原则三:对多次使用的RDD进行持久化持久化的级别,要针对于具体应用场景来选择
原则四:尽量避免使用shuffle类算子
原则五:使用map-side预聚合的shuffle操作
原则六:使用高性能的算子
原则七:广播大变量
原则八:使用Kryo优化序列化性能
原则九:优化数据结构
解决方案一:使用Hive ETL预处理数据
解决方案二:过滤少数导致倾斜的key
解决方案三:提高shuffle操作的并行度
解决方案四:两阶段聚合(局部聚合+全局聚合)
解决方案五:将reduce join转为map join
解决方案六:采样倾斜key并分拆join操作
解决方案七:使用随机前缀和扩容RDD进行join
解决方案八:多种方案组合使用

局部聚合+全局聚合解决数据倾斜问题

package com.qf.sql.day03

import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Dataframe, Row, SparkSession}


object _07TestDataSkew {
    def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().master("local[*]").appName("data skew").getOrCreate()
        import spark.implicits._

        var rdd1 = spark.sparkContext.makeRDD(List("a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a","b,c,d,e,f","b,b,c,c,d,e,f,g","a,b,a,c,f"))
        val df: Dataframe = rdd1.toDF("line")
        df.createTempView("temp")
        //先正常统计每个字符的个数
        val sql =
            """
              |select word,count(1)
              |from
              |(
              |select explode(split(line,",")) word
              |from temp) t1
              |group by t1.word
              |
              |""".stripMargin
//        spark.sql(sql).show()

        

        println("----------------先在单词前面拼接随机数字,比如0,1,2,3-----------------------")

        val sql1 =
            """
              |select concat(floor(rand()*4),"-",word)
              |from
              |(
              | select explode(split(line,",")) word
              |from temp) t1
              |
              |""".stripMargin
//        spark.sql(sql1).show()


        println("----------------將加上前缀的单词,进行预聚合-----------------------")

        val sql2 =
            """
              |
              |select prefix_word,count(1)
              |from(
              |select concat(floor(rand()*4),"-",word) prefix_word
              |from
              |(
              | select explode(split(line,",")) word
              |from temp) t1
              |) t2
              |group by prefix_word
              |
              |""".stripMargin
        spark.sql(sql2).show()


        println("----------------去掉前缀,进行全局聚合-----------------------")

        val sql3 =
            """
              |
              |select substr(prefix_word,instr(prefix_word,"-")+1) w,sum(num)
              |from
              |     (select prefix_word,count(1) num
              |     from(
              |         select concat(floor(rand()*4),"-",word) prefix_word
              |         from(
              |             select explode(split(line,",")) word
              |             from temp) t1
              |         ) t2
              |     group by prefix_word
              |     ) t3
              |group by  w
              |""".stripMargin
        spark.sql(sql3).show()
        spark.stop()
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350071.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号