MapReduce程序执行时,大部分Reduce节点都执行完毕,但是有一个或几个Reduce节点执行非常慢,导致整个任务处理时间过长。这是因为某个key的条数比其他key多很多,这个key所在的Reduce节点所处理的数据量比其他节点就大很多,才会导致某几个节点一直运行不完。这种情况就称为数据倾斜。
常见的原因- key分布不均匀
- 业务数据本身的特性
- 建表时考虑不周
- 某些SQL语句本身就有数据倾斜
设置hive.groupby.skewindata=true。这样生成的查询计划会有两个MapReduce job。
第一个MR job的Map输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作并输出结果。当然,相同的key有可能被分发到不同的Reduce中,从而达到负载均衡的目的。
第二个MR job再将预处理过的结果集合按照Group By Key分布到Reduce中,这样就能保证相同的Group By Key被分到同个Reduce中,完成最终的聚合操作。
2)Map端聚合设置hive.map.aggr=true。Map端部分聚合就相当于Combiner。
3)业务数据本身的特性问题比如日志数据有可能丢失,可能有大量的null值,可以将这些null值随机赋予一个key,再将其分发到不同的reduce。如果是业务数据分布不均,比如热门产品跟冷门产品的日志,那么可以设置合理的reduce个数来解决。
4)map join允许在Map阶段进行join操作,MapJoin会把小表全部读入内存中,在map阶段直接读取其他表的数据跟内存中表数据进行匹配。由于是在Map端进行join操作,省去reduce,运行效率会高些。
在对多表多join操作时,小表(通常是小于1G)在join左边,大表在join右边。这样执行join操作会将小表中的数据缓存到内存,可以减少发生内存溢出错误的几率。
5)将表分区,只读取符合条件的分区count distinct大量相同特殊值时,将值为空的情况单独处理,如果是计算count distinct,可以不用处理,直接过滤,在最后结果中加1。如果还有其他计算,需要进行group by,可以先将值为空的记录单独处理,再和其他计算结果进行union。
参考文章:数据分析系列(3):数据倾斜 | Free Will



