栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

SparkSQL优化分析--saveAsTable

SparkSQL优化分析--saveAsTable

文章目录

问题点:优化点:参考:

环境 spark2.4.8 on yarn(hadoop2.4.5)

优化后

package com.bl.bigdata.cdp.execservice.service.batch.schedule.common

import com.bl.bigdata.cdp.execservice._
import com.bl.bigdata.cdp.execservice.utils.business.TableUtils
import com.bl.bigdata.cdp.execservice.utils.third.{DateUtil, Slf4jLogger}
import org.apache.spark.sql.functions.rand
import org.apache.spark.sql.{SaveMode, SparkSession}

object StoreUserLabels extends Slf4jLogger {

  def main(args: Array[String]): Unit = {
    val sparkConf = getSparkConf()
    implicit val spark: SparkSession = createSparkSession(sparkConf)

    buildStoreLabel

    spark.stop()
  }

  private def buildStoreLabel()(implicit spark: SparkSession) = {
    import spark.{sql, table}
    import spark.implicits._

    val latestDate = DateUtil.getCurrentDate()

    sql(
      s"""
         |SELECt member_id id, label, store_id
         |FROM (
         |SELECt member_id member_id, tagdetailid2 label, storelabel2 store_id FROM userlibrary.cdp_purchase_1 WHERe cdate='$latestDate' AND tagstorelabel3 IS NOT NULL AND storelabel2 IS NOT NULL
         |UNIOn ALL
         |SELECt member_id member_id, tagdetailid2 label, storelabel2 store_id FROM userlibrary.cdp_purchase_2 WHERe cdate='$latestDate' AND tagstorelabel3 IS NOT NULL AND storelabel2 IS NOT NULL
         |UNIOn ALL
         |SELECt member_id member_id, tagdetailid2 label, storelabel2 store_id FROM userlibrary.cdp_purchase_3 WHERe cdate='$latestDate' AND tagstorelabel3 IS NOT NULL AND storelabel2 IS NOT NULL
         |UNIOn ALL
         |SELECt member_id member_id, tagdetailid2 label, storelabel2 store_id FROM userlibrary.cdp_population WHERe cdate='$latestDate' AND tagdetailid2 IS NOT NULL AND storelabel2 IS NOT NULL
         |UNIOn ALL
         |SELECt member_id id, tagdetailid2 label, 's50' store_id FROM userlibrary.cdp_ubt WHERe cdate='$latestDate'
         |) t
         |GROUP BY store_id, member_id, label
         |""".stripMargin).
      repartition($"store_id", $"id").
      createOrReplaceTempView("tv_store_user_label")

    sql(
      """
        |SELECT id, label, -1 value, store_id
        |FROM tv_store_user_label
        |""".stripMargin).
      groupBy("store_id", "id").
      pivot("label").
      max("value").repartition($"store_id", rand).
      createOrReplaceTempView("tv_user_labels")

    sql(
      """
        |SELECT member_id id, black_flag
        |FROM procdata.cdp_member_info
        |WHERe black_flag='1'
        |""".stripMargin).
      createOrReplaceTempView("tv_user_black_flag")

    sql(
      """
        |SELECT   IF(black_flag IS NULL,0, CAST(black_flag AS INT)) AS black_flag,  u.*
        |FROM tv_user_labels u
        |LEFT JOIN tv_user_black_flag b
        |ON u.id=b.id
        |""".stripMargin).
      createOrReplaceTempView("store_user_labels_black_flag")

    sql(s"DROP TABLE IF EXISTS test.store_user_labels_tmp")

    table("store_user_labels_black_flag").
      where("1=2").
      write.
      partitionBy("store_id").
      saveAsTable("test.store_user_labels_tmp")

    table("store_user_labels_black_flag").
      coalesce(69).
      //      repartition(69, $"store_id", rand).
      write.
      mode(SaveMode.Overwrite).
      partitionBy("store_id").
      parquet("/user/hive/warehouse/test.db/store_user_labels_tmp")

    spark.sql("MSCK REPAIR TABLE test.store_user_labels_tmp")

    TableUtils.renameAndRetainPhysicsTable("test.store_user_labels_tmp", "test.store_user_labels")
  }
}

/*
export SPARK_HOME=/opt/spark-2.4.8-212

$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster 
--class com.bl.bigdata.cdp.execservice.service.batch.schedule.common.StoreUserLabels8 
--name cdp_StoreUserLabels_wsw6 --executor-cores 6 --executor-memory 33g --driver-memory 26g --conf spark.driver.maxResultSize=25g 
--conf spark.sql.pivotMaxValues=2900 
--conf spark.network.timeout=3600 --conf spark.executor.heartbeatInterval=10000 
--conf spark.sql.shuffle.partitions=248 
--conf spark.default.parallelism=248 
--conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true 
--conf spark.dynamicAllocation.minExecutors=3 --conf spark.dynamicAllocation.maxExecutors=16 
--conf spark.dynamicAllocation.initialExecutors=9 
--conf spark.shuffle.memoryFraction=0.4 
~/sven/cdpsupport.jar
问题点:

1 shuffle时大量数据
2 数据倾斜严重

优化点:

1 增加并行执行数量
2 修改动态资源提交
3 增加shuffle聚合内存
4 saveAsTable功能优化
5 join操作改为left join操作,但这个left join实际是broadcast
6 合并操作减少stage
7 repartition 操作修改为coalesce
8 适当进行干预,尽量防止数据倾斜
9 减少shuffle聚合操作后数据溢写到磁盘

参考:

6) spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。

参数调优建议:如果不设置这个参数, Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,此时可以充分地利用Spark集群的资源。针对数据交换的场景,建议此参数设置为1-10。

7) spark.storage.memoryFraction
参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。

参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。针对数据交换的场景,建议降低此参数值到0.2-0.4。

8) spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。针对数据交换的场景,建议此值设置为0.1或以下。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753325.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号