栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

说一下sparkshuffle(shuffle spark)

说一下sparkshuffle(shuffle spark)

背景

安装数据读取的6m的表,原始数据有好几个T,数据量巨大,任务处理耗时不稳定,需要优化.

处理逻辑

从表里读出来gid和pkg,都不为空大概有3.5亿条.1,5T的数据

  

需求是要获取安装列表,installed_applist,去除冗余信息,只保留app list,以","分隔生成的字符串,然后关联pkg→category_code的映射表,获得每个gid安装了哪些类别的category,然后统计每个类别的pkg安装了多少.

最开始的做法是:

    installed_applist去除额外信息,只保留app list,以","分隔,保留gid,pkg,生成表a读取pkg-code的映射表,生成表b表a和表b以pkg关联,获取gid,code,生成表ccode是多级分类拼接而成,要把code炸裂开,一行变多行,形成每行数据一个gid,一个code的局面(这里生成的数据量大概有90亿)对数据以gid,code分组,求和(一次shuffle)再对gid分组,将所以code和count组合生成json字符串(二次shuffle)再repartition输出(三次shuffle)

以上做法的问题有:

    shuffle过多,严重影响任务执行效率中间数据生成过多,炸裂导致数据量级高了一个层级(亿→十亿)链路过长,数据分分合合执行时间过长,资源yarn资源控制在1T左右的情况下,要执行3个小时左右(没成功过)任务运行不稳定,动不动就oom,container被yarn杀死
优化思路

针对以上问题进行了分析.发现,要求输出的结果是以gid为维度聚合的,而数据输入也是gid维度聚合的,那么,既然输入输出的聚合维度相同,那么就没有将聚合结果拆分再聚合的必要.

所以,设想在不进行shuffle的情况下,或者尽量少shuffle的情况下,对数据进行处理.

输入数据有gid和app,只是缺少了code信息,而发现code表的数据量级不大,只有200M左右,二千万条.所以考虑对code表进行广播,将code表生成一个pkg-code的map发送到每个executor.,每个task执行的时候直接从map中get pkg对应的code,因为广播的原因,每个executor只有一个map,相当于在map端join,就不会产生shuffle.

同时,读取数据源获得dataset后应用mappartition方法,每个partition生成一个task,数据只在本地处理,处理一次能直接输出结果.

在mappartition方法中创建一个对象(继承mappartition类,实现call方法),对partition数据进行遍历,则每行数据读进来天然就是按照gid维度聚合的(因为6m表是以gid分组聚合的,不会有重复的gid),对每个gid的applist替换成pkg-code的map中的code,再对code处理.所有的处理逻辑都在本地进行,不涉及shuffle,则任务的执行效率有大大的提高.

public class InstallAppToCodeFunction implements MapPartitionsFunction> {

    Broadcast> opBD;

    public InstallAppToCodeFunction(Broadcast> opBD) {
        this.opBD = opBD;
    }

    @Override
    // gid ,applist
    public Iterator> call(Iterator input) throws Exception {
        ArrayList> gidResult = new ArrayList<>();
        HashMap pkgCodeMap = opBD.value();
        while (input.hasNext()) {
            JSonObject jsonObject = new JSonObject();
            Row row = input.next(); //gid,pkg1,pkg2,pkg3
            String gid = row.getString(0);
            String appList = row.getString(1);
            String[] apps = appList.split("\,");
            for (String app : apps) {//[kg1
                String codeList = pkgCodeMap.get(app);//c1,c2,c3
                if(StringUtils.isNotBlank(codeList)) {
                    String[] codes = codeList.split(",");
                    for (String code : codes) {//c1
                        if(jsonObject.containsKey(code)) {
                            jsonObject.put(code, jsonObject.getIntValue(code) + 1);
                        } else {
                            jsonObject.put(code, 1);
                        }
                    }
                }
            }
            if(jsonObject.keySet().size() > 0) {
                gidResult.add(Tuple2.apply(gid, jsonObject.toJSonString()));
            }
        }

        return gidResult.iterator();
    }
}
 // 安装数据,输出gid|json
            //之前路径 join->炸裂->groupBy->groupBy->reparation 三次shuffle
            //现在路径 广播->mappartition(分区遍历,结果添加到map和list中,遍历完后将结果取出输出)->coalesce  没有shuffle
            appData
                    .filter(length(col(GID)).$greater(0).and(length(col(PKG)).$greater(0))) //过滤均为空的行
                    .mapPartitions(new InstallAppToCodeFunction(opBD), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
                    .toDF(GID, DATA)
                    .filter(length(col(GID)).$greater(0).and(length(col(DATA)).$greater(0)))
                    .coalesce(6000)
                    .write()
                    .mode(SaveMode.Overwrite)
                    //.option(HEADER, String.valueOf(Boolean.FALSE))
                    .parquet(outPath);

同时,因为整个过程没有shuffle,读取的数据有多少个分区就会生成多少task,观察到读取数据任务数据量为3万,则需要在输出的时候控制下输出文件数量,刚开始是用的repartition方法,结果发现非常容易导致oom,虽然shuffle write的数据量只有50g左右,但是在shuffle read阶段读一会数据就oom了(非常奇怪,需要定位一下)同时注意mappartition方法返回值,经过多次试验,发现要想在调用该方法后再返回一个dataset,需要将方法返回值的类型设置为tuple,然后在调用方法后再调用toDF方法,为dataset添加schema.

因此改用coalesce方法,则设置的分区数就是最终输出的文件数量,同时也没有shufle.

同时,因为输出文件数量目测在50g左右,所以想当然把coalesce分区数控制在500左右(按照每个文件块128计算),结果导致每个task读取的数据量太大,频繁gc,所以后面把分区数量调大,gc情况明显好转.

目前任务运行稳定,在1T资源不到的情况下整个运行时间在1.5小时左右,阶段也只有2个(一个是读取维表),相当于整过过程没有shuffle.

总结
    shuffle是个非常耗时的过程,能不做就不做采用map端join是个比较好的避免shuffle的方式(广播变量)

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/772134.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号