本文假定读者对执行计划,普通 join 执行,Map join,Bucket Map join,SMB((Sort Merge Bucket) join, Skew Join 的执行过程比较熟悉。
背景知识Hive 执行计划解释示例
Hive Bucket Table 的功能和使用方法详解
Hive 使用 List Bucketing 表解决数据倾斜问题
基于规则的优化(Rule based Optimization):预先定义一些规则,按这些规则进行优化,效率更高。
不同的 SQL 处理器根据不同的引擎有不同的规则。如 Oracle 等传统数据库有使用索引的很多规则,这些规则在大数据框架里不适用。大数据计算框架里也有很多规则,传统的数据库不适用。
常见的基于规则的优化的规则有:谓词下推、常量上推和 Limit 限定。
PPD – Predicate Push Down(谓词下推):谓词下推包含两个内容,第 1 个是检索条件下推,第 2 个是查询字段下推。举例如下:
orders: 订单表,每个订单一条记录
lineitem:订单明细表,每个订单中每个物品一条记录,按 orderkey 和 orders 表关联。
select c.name, sum(quatity),sum(totalprice)
from (
select l.quatity, l.totalprice, o.custkey
from lineitem l join orders o
on (l.orderkey = o.orderkey)
) temp join customer c on (temp.custkey = c.custkey)
where temp.o_order_date='20210101' and temp.custkey='001'
group by c.name ;
以上查询是查询某一个客户编号的客户名称,和 2021 年买的所有商品的数量和金额。
检索条件下推:检索条件下推就是尽量把检索条件尽可能的推到里面。
谓词下推后的操作类似于以下 SQL:
select c.name, sum(quatity),sum(totalprice)
from (
select l.quatity, l.totalprice, o.custkey
from lineitem l
join
(select *
from orders
where o_order_date='20210101' and custkey='001'
) o
on (l.orderkey = o.orderkey)
) temp join customer c on (temp.custkey = c.custkey)
group by c.name ;
谓词下推后,orders 表中参加 join 计算的记录数减少,提高了计算效率。
查询字段下推查询字段下推是指尽量在表扫描中查询需要的字段。
优点:
- 列存储的表可以只从文件里读取相应的列,提高读取速度。可以减少参加计算的数据量,提高计算速度。
select c.name, sum(quatity),sum(totalprice)
from (
select l.quatity, l.totalprice, o.custkey
from lineitem l join orders o
on (l.orderkey = o.orderkey)
) temp join customer c on (temp.custkey = c.custkey)
where temp.o_order_date='20210101' and temp.custkey='001'
group by c.name ;
如果在表扫描中不是检索所有的字段,而是仅检索需要的字段。需要的字段包括 select 中的字段,谓词判断中用到的字段,关联条件用到的字段,group by 用到的字段。
在表扫描的时候只检索的字段如下:
lineitem: orderkey, quatity,totalprice。
orders: orderkey,o_order_date,custkey。
customer:custkey,name 字段。
同样的道理,如果子查询中查询的字段,在上层查询用不到,在子查询中也可以去掉。
常量上推常量上推就是把常量推到上面的查询中,越往后计算越好。
select a_c1, b_c1, name, xxx from ( select a.a_c1,b.b_c1,'fix value' name from a join b on xxx where xxx ) t join c on xxx where xxx
‘fix value’ 这个字段在子查询中是一个常量,可以推到上面的查询中,改写之后SQL以下 :
select a_c1, b_c1, 'fix value' name, xxx from ( select a.a_c1,b.b_c1 from a join b on xxx where xxx ) join c on xxx where xxx检索条件的传递
以下语句查询 某个客户2021年1月1日的订单数量。
select count(1) cnt from orders o join customer c on (o.custkey = c.custkey) where o.o_order_date='20210101' and c.custkey='001';
在遍历 orders 的记录时,自动加上 custkey = ‘001’ 的条件。
Limit 限定select * from web_sales join web_site on ws_web_site_sk = web_site_sk and web_site_sk=10 limit 10;
由于最多只保留 10 个结果,可以让每个 reduce 都限定输出 10 条数据后结束。
基于代价的优化基于代价的优化是比较不同的执行计划,选择代价低的执行计划执行。
设置 hive.cbo.enable=true 开启基于代价的优化。
基于代价的优化用于 JOIN 之间的排序和 join 算法的选择,依靠统计信息。
JOIN 顺序对执行效率有比较大的影响,举例说明如下:
select c.name, sum(quatity),sum(totalprice)
from (
select l.quatity, l.totalprice, o.custkey
from lineitem l join orders o
on (l.orderkey = o.orderkey)
) temp join customer c on (temp.custkey = c.custkey)
where temp.custkey='001'
group by c.name ;
如 lineitem 有 10 万条记录,orders有 1 万条记录。customer 有100 条,按 custkey=‘001’ 过滤后剩 1 条。
方案 1
inter1 = lineitem JOIN orders ,inter1 有10 万条记录。
result = inter1 join customer, 10 万条 记录 join 1 条 得到结果方案 2
inter1 = customer join orders,结果仅有 100 条
result = inter1 join lineitem
方案 1 第 1 步两个大表进行 join 计算,需要两个表的数据都需要 shuffle 到 reduce。
方案 2 每一个 join 都是一个大表关联一个小表,都可以用 map join 完成。
基于规则的优化考虑以下因素:
- CPU 使用量IO 使用量计算产生的记录数(不同的算法,产生的中间计算结果的数量不相等)记录平均大小
记录平均大小和记录数用来估计存储需要的内存,可以决定是否可以采用某些 join 算法,如 Map/ Bucket join。
Calcite 默认使用火山优化器(Volcano optimizer)来计算不同执行计划的代价。火山代价(VolcanoCost)仅用记录数来评判执行计划是否高效。
HiveCost 是 Hive 的代价实现,该代价包括 CPU, I/O, 记录数和记录平均大小。HiveCost 认为 CPU + I/O 优先级更高。
计算代价时使用的代价变量Hr - 从 HDFS 读 1 字节花费的代价(纳秒数)Hw - 往 HDFS 写 1 字节花费的代价(纳秒数)Lr - 从本地文件系统读 1 字节花费的代价(纳秒数)Lw - 往本地文件系统写 1 字节花费的代价(纳秒数)NEt - 在网络上任意两个节点传输 1 个字节的代价(纳秒数)T® - 关系 R 记录数Tsz - 记录的平均大小V(R, a) - 关系 R 中 属性 a 不同值的数量。CPUc - 做一次比较的 cpu 代价(纳秒数) 代价计算假设
- 磁盘, HDFS, 网络读写比其他代价更高
- 计算 I/O 代价时,不考虑硬件的类型,一次 IO 操作的数据大小,顺序读写还是随机读写,磁盘一个 block 的大小,存储分布情况。
CPUc = 1 纳秒NEt = 150 * CPUc 纳秒Lw = 4 * NEtLr = 4 * NEtHw = 10 * LwHr = 1.5 * Lr 各种操作的代价
参考 HiveOnTezCostModel.java
Table Scan 的代价T® = 记录数
Tsz = 记录平均大小
V(R, a) = 从 meta 获取
CPU Usage = 0
IO Usage = Hr * T® * Tsz. (从 HDFS 读一字节的开销 * 记录数 * 平均记录大小)
CPU Usage = T® * CPUc
IO Usage = 0
CPU Usage = 0
IO Usage = 0
R1 join R2
CPU Usage = 每张表的排序代价 + 两个排序流的 merge 代价 = [(T(R1) * Log(T(R1) * CPUc + T(R2) * Log(T(R2) * CPUc) ] + [(T(R1) + T(R2) ) * CPUc]
IO Usage = Map 写 shuffle 数据到本地文件系统,Map 从本地文件系统读 shuffle 数据, shuffle 数据 通过网络从 Map 服务器传输到 reduce 所在的服务器 = Lw * (T(R1) * Tsz1 + T(R2) * Tsz2) + Lr * (T(R1) * Tsz1 + T(R2) * Tsz2) + NEt * (T(R1) * Tsz1 + T(R2) * Tsz2)
Map JoinR1 JOIN R2,R1是小表,R2 是大表
CPU Usage = 小表构建 HashTable 开销 + 大表 Join 的开销 = T(R1) + T(R2) * CPUc
IO Usage = 传输一次小表的代价 * Map 的数量 = NEt * (T(R1) * Tsz1) * Map 的数量
假定大表数据在本地,不需要传输。存算分离场景是否也要计算大表 IO 开销?
R1 JOIN R2 on R1.c1 = R2.c2。R1 按 c1 bucket,R2 按c2 bucket。
一张表的 bucket 的数量是另一张的整数倍。
CPU Usage = 小表构建 HashTable 开销 + 大表 Join 的开销 = T(R1) * CPUc + T(R2) * CPUc
IO Usage 和 Map Join 一样
数据都有序,小表不需要构建 Hash Table。
CPU Usage = (T(R1) + T(R2)) * CPUc
IO Usage 和 Map Join 一样
查询重写为两个 join 的结果 union 在一起。计算各自 join 的代价
Distinct/ Group byCPU Usage = 排序的代价 + 分组的代价 = (T® * log T®) * CPUc



