安装数据读取的6m的表,原始数据有好几个T,数据量巨大,任务处理耗时不稳定,需要优化.
处理逻辑从表里读出来gid和pkg,都不为空大概有3.5亿条.1,5T的数据
需求是要获取安装列表,installed_applist,去除冗余信息,只保留app list,以","分隔生成的字符串,然后关联pkg→category_code的映射表,获得每个gid安装了哪些类别的category,然后统计每个类别的pkg安装了多少.
最开始的做法是:
- installed_applist去除额外信息,只保留app list,以","分隔,保留gid,pkg,生成表a读取pkg-code的映射表,生成表b表a和表b以pkg关联,获取gid,code,生成表ccode是多级分类拼接而成,要把code炸裂开,一行变多行,形成每行数据一个gid,一个code的局面(这里生成的数据量大概有90亿)对数据以gid,code分组,求和(一次shuffle)再对gid分组,将所以code和count组合生成json字符串(二次shuffle)再repartition输出(三次shuffle)
以上做法的问题有:
- shuffle过多,严重影响任务执行效率中间数据生成过多,炸裂导致数据量级高了一个层级(亿→十亿)链路过长,数据分分合合执行时间过长,资源yarn资源控制在1T左右的情况下,要执行3个小时左右(没成功过)任务运行不稳定,动不动就oom,container被yarn杀死
针对以上问题进行了分析.发现,要求输出的结果是以gid为维度聚合的,而数据输入也是gid维度聚合的,那么,既然输入输出的聚合维度相同,那么就没有将聚合结果拆分再聚合的必要.
所以,设想在不进行shuffle的情况下,或者尽量少shuffle的情况下,对数据进行处理.
输入数据有gid和app,只是缺少了code信息,而发现code表的数据量级不大,只有200M左右,二千万条.所以考虑对code表进行广播,将code表生成一个pkg-code的map发送到每个executor.,每个task执行的时候直接从map中get pkg对应的code,因为广播的原因,每个executor只有一个map,相当于在map端join,就不会产生shuffle.
同时,读取数据源获得dataset后应用mappartition方法,每个partition生成一个task,数据只在本地处理,处理一次能直接输出结果.
在mappartition方法中创建一个对象(继承mappartition类,实现call方法),对partition数据进行遍历,则每行数据读进来天然就是按照gid维度聚合的(因为6m表是以gid分组聚合的,不会有重复的gid),对每个gid的applist替换成pkg-code的map中的code,再对code处理.所有的处理逻辑都在本地进行,不涉及shuffle,则任务的执行效率有大大的提高.
public class InstallAppToCodeFunction implements MapPartitionsFunction> { Broadcast
> opBD; public InstallAppToCodeFunction(Broadcast > opBD) { this.opBD = opBD; } @Override // gid ,applist public Iterator > call(Iterator input) throws Exception { ArrayList
> gidResult = new ArrayList<>(); HashMap pkgCodeMap = opBD.value(); while (input.hasNext()) { JSonObject jsonObject = new JSonObject(); Row row = input.next(); //gid,pkg1,pkg2,pkg3 String gid = row.getString(0); String appList = row.getString(1); String[] apps = appList.split("\,"); for (String app : apps) {//[kg1 String codeList = pkgCodeMap.get(app);//c1,c2,c3 if(StringUtils.isNotBlank(codeList)) { String[] codes = codeList.split(","); for (String code : codes) {//c1 if(jsonObject.containsKey(code)) { jsonObject.put(code, jsonObject.getIntValue(code) + 1); } else { jsonObject.put(code, 1); } } } } if(jsonObject.keySet().size() > 0) { gidResult.add(Tuple2.apply(gid, jsonObject.toJSonString())); } } return gidResult.iterator(); } }
// 安装数据,输出gid|json
//之前路径 join->炸裂->groupBy->groupBy->reparation 三次shuffle
//现在路径 广播->mappartition(分区遍历,结果添加到map和list中,遍历完后将结果取出输出)->coalesce 没有shuffle
appData
.filter(length(col(GID)).$greater(0).and(length(col(PKG)).$greater(0))) //过滤均为空的行
.mapPartitions(new InstallAppToCodeFunction(opBD), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.toDF(GID, DATA)
.filter(length(col(GID)).$greater(0).and(length(col(DATA)).$greater(0)))
.coalesce(6000)
.write()
.mode(SaveMode.Overwrite)
//.option(HEADER, String.valueOf(Boolean.FALSE))
.parquet(outPath);
同时,因为整个过程没有shuffle,读取的数据有多少个分区就会生成多少task,观察到读取数据任务数据量为3万,则需要在输出的时候控制下输出文件数量,刚开始是用的repartition方法,结果发现非常容易导致oom,虽然shuffle write的数据量只有50g左右,但是在shuffle read阶段读一会数据就oom了(非常奇怪,需要定位一下)同时注意mappartition方法返回值,经过多次试验,发现要想在调用该方法后再返回一个dataset,需要将方法返回值的类型设置为tuple,然后在调用方法后再调用toDF方法,为dataset添加schema.
因此改用coalesce方法,则设置的分区数就是最终输出的文件数量,同时也没有shufle.
同时,因为输出文件数量目测在50g左右,所以想当然把coalesce分区数控制在500左右(按照每个文件块128计算),结果导致每个task读取的数据量太大,频繁gc,所以后面把分区数量调大,gc情况明显好转.
目前任务运行稳定,在1T资源不到的情况下整个运行时间在1.5小时左右,阶段也只有2个(一个是读取维表),相当于整过过程没有shuffle.
总结- shuffle是个非常耗时的过程,能不做就不做采用map端join是个比较好的避免shuffle的方式(广播变量)



