栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问

Databricks 企业版 Spark&Delta Lake 引擎助力 Lakehouse 高效访问

作者

李锦桂(锦犀),阿里云开源大数据平台开发工程师

王晓龙(筱龙),阿里云开源大数据平台技术专家

1

背景介绍   

Databricks 是全球领先的 Data+AI 企业,是 Apache Spark 的创始公司,也是 Spark 的最大代码贡献者,核心围绕 Spark、Delta Lake、MLFlow 等开源生态打造企业级 Lakehouse 产品。2020年,Databricks 和阿里云联手打造了基于 Apache Spark 的云上全托管大数据分析& AI 平台——Databricks 数据洞察(DDI,Databricks DataInsight),为用户提供数据分析、数据工程、数据科学和人工智能等方面的服务,构建一体化的 Lakehouse 架构。

Delta Lake 是 Databricks 从2016年开始在内部研发的一款支持事务的数据湖产品,于2019年正式开源。除了社区主导的开源版 Delta Lake OSS,Databricks 商业产品里也提供了企业版 Spark&Detla Lake 引擎,本文将介绍企业版提供的产品特性如何优化性能,助力高效访问 Lakehouse。

2

针对小文件问题的优化解法   

在 Delta Lake 中频繁执行 merge, update,insert 操作,或者在流处理场景下不断往 Delta 表中插入数据,会导致 Delta 表中产生大量的小文件。小文件数量的增加一方面会使得 Spark 每次串行读取的数据量变少,降低读取效率,另一方面,使得 Delta 表的元数据增加,元数据获取变慢,从另一个维度降低表的读取效率。

为了解决小文件问题,Databricks 提供了三个优化特性,从避免小文件的产生和自动/手动合并小文件两个维度来解决 Delta Lake 的小文件问题。

特性1:优化 Delta 表的写入,避免小文件产生

在开源版 Spark 中,每个 executor 向 partition 中写入数据时,都会创建一个表文件进行写入,最终会导致一个 partition 中产生很多的小文件。

Databricks 对 Delta 表的写入过程进行了优化,对每个 partition,使用一个专门的 executor 合并其他 executor 对该 partition 的写入,从而避免了小文件的产生。

该特性由表属性 delta.autoOptimize.optimizeWrite 来控制:

可以在创建表时指定

CREATE TABLE student (id INT, name STRING)
TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

也可以修改表属性

ALTER TABLE table_name
SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true);

该特性有两个优点:

通过减少被写入的表文件数量,提高写数据的吞吐量;

避免小文件的产生,提升查询性能。

其缺点也是显而易见的,由于使用了一个 executor 来合并表文件的写入,从而降低了表文件写入的并行度,此外,多引入的一层 executor 需要对写入的数据进行 shuffle,带来额外的开销。因此,在使用该特性时,需要对场景进行评估:

该特性适用的场景:频繁使用 MERGE,UPDATE,DELETE,INSERT INTO,CREATE TABLE AS SELECT 等 SQL 语句的场景;

该特性不适用的场景:写入 TB 级以上数据。

特性2:自动合并小文件

在流处理场景中,比如流式数据入湖场景下,需要持续的将到达的数据插入到 Delta 表中,每次插入都会创建一个新的表文件用于存储新到达的数据,假设每10s触发一次,那么这样的流处理作业一天产生的表文件数量将达到8640个,且由于流处理作业通常是 long-running 的,运行该流处理作业100天将产生上百万个表文件。这样的 Delta 表,仅元数据的维护就是一个很大的挑战,查询性能更是急剧恶化。

为了解决上述问题,Databricks 提供了小文件自动合并功能,在每次向 Delta 表中写入数据之后,会检查 Delta 表中的表文件数量,如果 Delta 表中的小文件(size < 128MB 的视为小文件)数量达到阈值,则会执行一次小文件合并,将 Delta 表中的小文件合并为一个新的大文件。

该特性由表属性 delta.autoOptimize.autoCompact 控制,和特性 delta.autoOptimize.optimizeWrite 相同,可以在创建表时指定,也可以对已创建的表进行修改。自动合并的阈值由 spark.databricks.delta.autoCompact.minNumFiles 控制,默认为50,即小文件数量达到50会执行表文件合并;合并后产生的文件最大为128MB,如果需要调整合并后的目标文件大小,可以通过调整配置 spark.databricks.delta.autoCompact.maxFileSize 实现。

特性3:手动合并小文件

自动小文件合并会在对 Delta 表进行写入,且写入后表中小文件达到阈值时被触发。除了自动合并之外,Databricks 还提供了 Optimize 命令使用户可以手动合并小文件,优化表结构,使得表文件的结构更加紧凑。在实现上 Optimize 使用 bin-packing 算法,该算法不但会合并表中的小文件,且合并后生成的表文件也更均衡(表文件大小相近)。例如,我们要对 Delta 表 student 的表文件进行优化,仅需执行如下命令即可实现:

OPTIMIZE student;

Optimize 命令不但支持全表小文件的合并,还支持特定的分区的表文件的合并,例如,我们可以仅对 date 大于2017-01-01的分区中的小文件进行合并:

OPTIMIZE student WHERe date >= '2017-01-01'

从 Databricks 数据洞察产品上的试验数据看,Optimize 能使查询性能达到 8x 以上的提升。

3

媲美企业级数据库的查询优化技术   

Databricks 在数据查询方面也做了诸多优化:

特性1:Data Skipping

在数据查询系统中,有两种经典的查询优化技术:一种是以更快的速度处理数据,另一种是通过跳过不相关的数据,减少需要扫描的数据量。Data Skipping 属于后一种优化技术,通过表文件的统计信息跳过不相关的表文件,从而提升查询性能。

在向 Delta 表中新增表文件时,Delta Lake 会在 Delta 表的元数据中存储该表文件中的数据前32列的统计信息,包括数据列的最大最小值,以及为 null 的行的数量,在查询时,Databricks 会利用这些统计信息提升查询性能。例如:对于一张 Delta 表的 x 列,假设该表的一个表文件 x 列的最小值为5,最大值为10,如果查询条件为 where x < 3,则根据表文件的统计信息,我们可以得出结论:该表文件中一定不包含我们需要的数据,因此我们可以直接跳过该表文件,减少扫描的数据量,进而提升查询性能。

Data Skipping 的实现原理和布隆过滤器类似,通过查询条件判断表文件中是否可能存在需要查询的数据,从而减少需要扫描的数据量。如果表文件不可能存在查询的数据,则可以直接跳过,如果表文件可能存在被查询的数据,则需要扫描表文件。

为了能尽可能多的跳过和查询无关的表文件,我们需要缩小表文件的 min-max 的差距,使得相近的数据尽可能在文件中聚集。举一个简单的例子,假设一张表包含10个表文件,对于表中的 x 列,它的取值为[1,10],如果每个表文件的 x 列的分布均为[1, 10],则对于查询条件:where x < 3,无法跳过任何一个表文件,因此,也无法实现性能提升,而如果每个表文件的 min-max 均为0,即在表文件1的 x 列分布为[1, 1],表文件2的x列分布为[2, 2]...,则对于查询条件:where x < 3,可以跳过80%的表文件。受该思想的启发,Databricks 支持使用 Z-Ordering 来对数据进行聚集,缩小表文件的 min-max 差距,提升查询性能。下面我们介绍 Z-Ordering 优化的原理和使用。

特性2:Z-Ordering 优化

如上一节所解释的,为了能尽可能多的跳过无关的表文件,表文件中作为查询条件的列应该尽可能紧凑(即 min-max 的差距尽可能小)。Z-Ordering 就可以实现该功能,它可以在多个维度上将关联的信息存储到同一组文件中,因此确切来说,Z-Ordering 实际是一种数据布局优化算法,但结合 Data Skipping,它可以显著提升查询性能。

Z-Ordering 的使用非常简单,对于表 events,如果经常使用列 eventType 和 generateTime 作为查询条件,那么执行命令:

OPTIMIZE events ZORDER BY (eventType, generateTime)

Delta 表会使用列 eventType 和 generateTime 调整数据布局,使得表文件中 eventType 和 generateTime 尽可能紧凑。

根据我们在 Databricks DataInsight 上的试验,使用 Z-Ordering 优化能达到40倍的性能提升,具体的试验案例参考文末 Databricks 数据洞察的官方文档。

特性3:布隆过滤器索引

布隆过滤器也是一项非常有用的 Data-skipping 技术。该技术可以快速判断表文件中是否包含要查询的数据,如果不包含就及时跳过该文件,从而减少扫描的数据量,提升查询性能。

如果在表的某列上创建了布隆过滤器索引,并且使用 where col = "something" 作为查询条件,那么在扫描表中文件时,我们可以使用布隆过滤器索引得出两种结论:文件中肯定不包含 col = "something" 的行,或者文件有可能包含 col = "something" 的行。

当得出文件中肯定不包含 col =     "something" 的行的结论时,就可以跳过该文件,从而减少扫描的数据量,提升查询性能。

当得出文件中可能包含 col =     "something" 的行的结论时,引擎才会去处理该文件。注意,这里仅仅是判断该文件中可能包含目标数据。布隆过滤器定义了一个指标,用于描述出现判断失误的概率,即判断文件中包含需要查询的数据,而实际上该文件并不包含目标数据的概率,并称之为 FPP(False Positive Probability: 假阳性概率)。

Databricks 支持文件级 Bloom 过滤器,如果在表的某些列创建了布隆过滤器索引,那么该表的每个表文件都会关联一个 Bloom 筛选器索引文件,索引文件存储在表文件同级目录下的  _delta_index 子目录中。在读取表文文件之前,Databricks 会检查索引文件,根据上面的步骤判断表文件中是否包含需要查询的数据,如果不包含则直接跳过,否则再进行处理。

布隆过滤器索引的创建和传统数据库索引的创建类似,但需要指定假阳性概率和该列可能出现的值的数量:

CREATE BLOOMFILTER INDEX ON TABLE table_name
FOR COLUMNS(col_name OPTIONS (fpp=0.1, numItems=50000000))

根据我们在 Databricks DataInsight 上的试验,使用布隆过滤器索引能达到3倍以上的性能提升,试验案例参考文末 Databricks 数据洞察的官方文档。

特性4:动态文件剪枝

动态文件剪枝(Dynamic File Pruning, DFP)和动态分区剪枝(Dynamic Partition Pruning)相似,都是在维表和事实表的 Join 执行阶段进行剪枝,减少扫描的数据量,提升查询效率。

下面我们以一个简单的查询为例来介绍 DFP 的原理:

SELECT sum(ss_quantity) FROM store_sales 
JOIN item ON ss_item_sk = i_item_sk
WHERe i_item_id = 'AAAAAAAAICAAAAAA'

在该查询中,item 为维表(数据量很少),store_sales 为事实表(数据量非常大),where 查询条件作用于维表上。如果不开启 DFP,那么该查询的逻辑执行计划如下:

从上图可以看出,先对 store_sales 进行全表扫描,然后再和过滤后的 item 表的行进行 join,虽然结果仅有4万多条数据,但却扫描了表 store_sales 中的80多亿条数据。针对该查询,很直观的优化是:先查询出表 item 中 i_item_id = 'AAAAAAAAICAAAAAA' 数据行,然后将这些数据行的 i_item_sk 值作为表 store_sales 的 ss_item_sk 的查询条件在表 store_sales 的 SCAN 阶段进行过滤,结合我们在上面介绍的 Data Skipping 技术,可以大幅减少表文件的扫描。这一思路正是 DFP 的根本原理,启动 DFP 后的逻辑执行计划如下图所示:

可以看到,在启用 DFP 之后,过滤条件被下推到 SCAN 操作中,仅扫描了600多万条 store_sales 中的数据,从结果上看,启动 DFP 后,该条查询实现了10倍的性能提升,此外, Databricks 还针对该特性对 TPC-DS 测试,测试发现启用 DFP 后,TPC-DS 的第15条查询达到了8倍的性能提升,有36条查询实现了2倍及以上的性能提升。

4

总结   

前文概括介绍了 Databricks 企业版 Delta Lake 的性能优势,借助这些特性能够大幅提升 Spark SQL 的查询性能,加快 Delta 表的查询速度。Databricks 基于企业版 Lakehouse 架构为众多企业提供了价值。


加入我们

阿里云计算平台事业部开源大数据生态企业团队负责阿里云上大数据开源生态方向的商业化产品接入,合作伙伴包括 Databricks、Confluent、Cloudera、Starburst 等开源领域的明星级企业,目前团队在火热招聘中,期待你的加入,邮箱请联系zongze.hzz@alibaba-inc.com。

参考文档和试验数据

https://help.aliyun.com/document_detail/190745.html


我们会在钉群推送精彩文章,邀请技术大牛直播分享。
欢迎钉钉扫码加入交流群一起参与讨论~

戳“阅读原文”享599元优惠试用!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/759046.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号