栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

spark基础(spark实例)

spark基础(spark实例)

前言

本文隶属于专栏《大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见大数据技术体系


WHAT

CacheManager 是 Spark SQL 中内存缓存的管理者,在 Spark SQL 中提供对缓存查询结果的支持,并在执行后续查询时自动使用这些缓存结果。

数据使用 InMemoryRelation 中存储的字节缓冲区进行缓存。

这个关系是自动替换的查询计划,逻辑计划返回与最初缓存的查询相同的结果。

CacheManager 只能在 Spark SQL 内部使用。

CacheManager 通过 SharedState 在 SparkSessions 之间共享。

val spark: SparkSession = ...
spark.sharedState.cacheManager

CacheManager 可以是空的。

通过在 Spark 的 conf/log4j.properties 添加下面的配置可以查看 CacheManager 内部发生了什么?

log4j.logger.org.apache.spark.sql.execution.CacheManager=ALL

在触发缓存并且日志打印级别符合的情况下,会出现下面的打印日志:

Asked to cache already cached data.

怎么触发 CacheManager 管理缓存?
    Spark 开发人员可以使用 Spark SQL 的 cache 或者 persist 算子 或者 SQL 的cache table 来通过 CacheManager 管理缓存。Spark Core 的cache 或者 persist 算子和 CacheManager 没有关系。

缓存怎么卸载?
    使用 Dataset.unpersist 算子。执行 DropTableCommand 和 TruncateTableCommand 逻辑命令。CatalogImpl 请求 uncache 和 refresh 表或视图,dropTempView/dropGlobalTempView

缓存到底长啥样?

CacheManager 使用 CachedData 数据结构使用 LogicalPlan(结构化查询)和相应的 InMemoryRelation 逻辑运算符管理缓存结构化查询。

  @transient @volatile
  private var cachedData = IndexedSeq[CachedData]()

可以看到缓存本质上是一个 IndexedSeq


IndexedSeq

IndexedSeq表示保证不可变的索引序列。

索引序列支持恒定时间或接近恒定时间的元素访问和长度计算。

它们是根据用于索引和长度的抽象方法定义的。

索引序列不会给Seq添加任何新方法,但可以有效实现随机访问模式

IndexedSeq 的默认实现是一个 scala.Vector


CachedData

如果说IndexedSeq是一个容器的话,那么CachedData就是容器里面存放的数据。

我们看看CachedData的类定义。

case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)

可以看到CachedData底层就是一个LogicalPlan 和InMemoryRelation。

InMemoryRelation 封装了一个缓存构建器,使用它,当我们使用缓存数据的时候,就不会触发 job,并且可以实现缓存 RDD 的懒加载。


InMemoryRelation 还缓存了哪些配置?

spark.sql.inMemoryColumnarStorage.compressed (默认 enabled)spark.sql.inMemoryColumnarStorage.batchSize (默认 10000)输入数据的存储级别 (默认 MEMORY_AND_DISK)。优化过的物理查询计划 (在请求 SessionState 执行 analyzed logical plan 之后)。输入的表名。analyzed 查询计划的统计信息。


怎么判断查询是否已缓存?
  final def sameResult(other: PlanType): Boolean = this.canonicalized == other.canonicalized

可以看到,Spark 通过比较两个查询计划的canonicalized 是否相等来决定是否启用缓存。

那么,canonicalized 到底是什么呢?


canonicalized

我们知道实现同一种功能,不同开发人员使用的 SQL 语法都可能存在差异,此时,为了保证能够充分利用到已有的查询计划,我们需要针对不同的查询计划做一个规范化的处理,这就是canonicalized存在的意义。

canonicalized 是在 QueryPlan.scala 中被定义的

@transient final lazy val canonicalized: PlanType = {
    var plan = doCanonicalize()
    // 如果计划没有因规范化而更改,请复制一份,这样我们就不会更改原始计划的_isCanonicalizedPlan标志。  
    if (plan eq this) {
      plan = plan.makeCopy(plan.mapProductIterator(x => x.asInstanceOf[AnyRef]))
    }
    plan._isCanonicalizedPlan = true
    plan
  }
protected def doCanonicalize(): PlanType = {
    val canonicalizedChildren = children.map(_.canonicalized)
    var id = -1
    mapexpressions {
      case a: Alias =>
        id += 1
        // 作为表达式的根,Alias将始终采用任意的exprId,我们需要递增地从 0 开始分配 exprId,将其规范化以进行相等性测试。这个别名无关紧要,应该删除。
        val normalizedChild = QueryPlan.normalizeexpressions(a.child, allAttributes)
        Alias(normalizedChild, "")(ExprId(id), a.qualifier)

      case ar: AttributeReference if allAttributes.indexOf(ar.exprId) == -1 =>
        // 顶层的“AttributeReference”也可以用于像“Alias”这样的输出,我们应该也要使 exprId 正常化。
        id += 1
        ar.withExprId(ExprId(id)).canonicalized

      case other => QueryPlan.normalizeexpressions(other, allAttributes)
    }.withNewChildren(canonicalizedChildren)
  }
def normalizeexpressions[T <: expression](e: T, input: AttributeSeq): T = {
    e.transformUp {
      case s: Planexpression[QueryPlan[_] @unchecked] =>
        // 规范化子查询计划中的外部引用。
        val normalizedPlan = s.plan.transformAllexpressionsWithPruning(
          _.containsPattern(OUTER_REFERENCE)) {
          case OuterReference(r) => OuterReference(QueryPlan.normalizeexpressions(r, input))
        }
        s.withNewPlan(normalizedPlan)

      case ar: AttributeReference =>
        val ordinal = input.indexOf(ar.exprId)
        if (ordinal == -1) {
          ar
        } else {
          ar.withExprId(ExprId(ordinal))
        }
    }.canonicalized.asInstanceOf[T]
  }

通过上面的源码阅读可以得到以下的结论:
1. 规范化重点在于消除表面变化(区分大小写、交换操作顺序、ExprId 等)
2. 默认情况下规范化主要处理的是 ExprId。
3. 特殊情况下规范化需要重写 QueryPlan.doCanonicalize 方法。


Spark 3.3.0 版本总共有 21 个特殊的 QueryPlan 重写了QueryPlan.doCanonicalize 方法。

    HiveTableScanExecRowDataSourceScanExecSubqueryExecReusedExchangeExecFileSourceScanExecInMemoryTableScanExecAdaptiveSparkPlanExecReusedSubqueryExecSubqueryAliasSubqueryAdaptiveBroadcastExecSubqueryBroadcastExecInMemoryRelationHiveTableRelationViewRangeExecQueryStageExecBroadcastExchangeExecJoinLogicalRelationResolvedHintBatchScanExec

遍历了上面 21 种特殊查询计划的源码后,可以很明显的得出下面的结论:

1. 规范化更多的是对当前查询计划的副本进行操作
2. 规范化在不同的场景下只会关注某些特定属性,即这些属性一致我们也会认为这些查询计划是同一个,在 CacheManager 中将会得到重用。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/771420.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号