join实践: 万亿级数据量任务优化历程
单字段去重先看一个简单的sql ,pv_id 去重计数
SELECt visit_type, count(DISTINCT pv_id) as pv_cnt from exp_table where ds=20220320 group by visit_type;
在默认情况下,相同的visit_type 的pv_id 会被分配到同一个reducer中处理,如果某个visit_type的数据量特别大,那么对应的reducer执行耗时会比较久或者可能会发生OOM,因此常规优化方式是:
select visit_type,count(*) from ( SELECt visit_type,pv_id from exp_table where ds=20220320 group by visit_type,pv_id ) group by visit_type;
也就是将count distinct 转换为 group by 操作,第一层根据visit_type,pv_id分组,第二层根据visit_type 直接求和即可,使数据分布更加均匀。但是 这种方式在第二层group by 也可能会产生大量的数据shuffle操作,可以再次优化:
select visit_type,sum(cnt) from ( SELECt visit_type, count(distinct pv_id) as cnt from exp_table where ds=20220320 group by visit_type,hash(pv_id)%50 ) group by visit_type;
第一层使用visit_type+hash(pv_id)%50 方式分组,对相同visit_type下的pv_id分了50组,保证相同pv_id 都能分配到相同的reducer中去,然后执行去重计数(cnt)操作,然后在第二层中根据visit_type 分组,对cnt求和即可。这种方式在第二层shuffle过程中数据就会相对减少很多。
多字段去重SELECt visit_type, count(distinct pv_id), count(distinct item_id) from exp_table where ds=20220320 group by visit_type;
这次同时需要对pv_id与item_id去重计数,如果还是按照上述的优化方式将visit_type、pv_id、item_id组合很显然已经行不通了,没办法保证相同的session_id或者item_id都会分配在同一个reducer中去。先使用常规意义上的操作:
SELECt a.visit_type ,a.cnt1 ,b.cnt2 FROM ( SELECt visit_type ,count(*) AS cnt1 FROM ( SELECt visit_type ,pv_id FROM exp_table WHERe ds = 20220320 GROUP BY visit_type ,pv_id ) GROUP BY visit_type ) a join ( SELECt visit_type ,count(*) AS cnt2 FROM ( SELECt visit_type ,item_id FROM exp_table WHERe ds = 20220320 GROUP BY visit_type ,item_id ) GROUP BY visit_type ) b ON a.visit_type = b.visit_type ;
也就是先拆分再join, 很显然这种方式开发难度大,特别是在处理字段更多的情况下。再重新按照单字段优化方式思考,希望按照所有的去重字段组合的情况下,仍然能够保证相同pv_id或者item_id都会分配在同一个reducer中去处理, 也是pv_id与item_id各自不影响其分配方式,可以采取先扩充数据,即将每一条数据扩充到去重字段个数的倍数,并且保证一个去重的字段不为空,并且增加标识字段,表明去重的列,如下图:
扩充后的数据执行常规的去重操作,即然后组合去重字段分组然后最外层进行汇总,由于扩充之后的数据每一条只有一个不为空的列,那么在执行shuffle 的时候,相同的pv_id或者item_id一定会分配在同一个reducer中去处理。数据扩充使用udtf实现:
@Override
public void process(Object[] args) throws UDFException {
// TODO
for(int i=0;i
具体优化sql:
SELECt visit_type ,count(CASE WHEN TYPE='flag0' THEN 1 END) AS pv_cnt ,count(CASE WHEN TYPE='flag1' THEN 1 END) AS item_cnt FROM ( SELECt visit_type ,pv_id1 ,item_id1 ,type FROM ( SELECt visit_type ,pv_id1 ,item_id1 ,type FROM exp_table LATERAL VIEW ExpandHash(pv_id,item_id) tmp AS pv_id1,item_id1,type WHERe ds = 20220320 ) GROUP BY visit_type ,pv_id1 ,item_id1 ,type ) GROUP BY visit_type
这种方式导致了数据量翻倍,在shuffle阶段io 也会耗时增加,具体耗时、资源消耗以实际情况为准,然后去做均衡具体选择哪一种方式。
思考
Q: 同时存在count distinct 与 sum 类的聚合该如何优化倾斜问题?
历史推荐
AliExpress 基于Flink的实时数仓建设
Flink 程序设计之道
数仓指标一致性
关于Event-Time 所带来的的问题
不得不掌握的三种BitMap



