栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

山东大学软件工程应用与实践——Pig代码分析(十一)

山东大学软件工程应用与实践——Pig代码分析(十一)

2021SC@SDUSC

概述
本次继续分析pig作为hadoop的轻量级脚本语言操作hadoop的executionengine包下的MRUtil类的代码。

静态方法impleConnectMapToReduce
simpleConnectMapToReduce 是一个实用程序,用于在 mapreduce 运算符中结束映射阶段并启动 reduce 阶段。
其中,mro 仅包含地图计划,同时需要添加POLocalRearrange到结束地图计划,并添加POPackage开始一个减少计划 。
整个程序中的POLocalRearrange/POPackage是微不足道的。

static public void simpleConnectMapToReduce(MapReduceOper mro, String scope, NodeIdGenerator nig) throws PlanException
    {
        PhysicalPlan ep = new PhysicalPlan();
        POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
        prjStar.setResultType(DataType.TUPLE);
        prjStar.setStar(true);
        ep.add(prjStar);
        List eps = new ArrayList();
        eps.add(ep);

        POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
        try {
            lr.setIndex(0);
        } catch (ExecException e) {
            int errCode = 2058;
            String msg = "Unable to set index on the newly created POLocalRearrange.";
            throw new PlanException(msg, errCode, PigException.BUG, e);
        }
        lr.setKeyType(DataType.TUPLE);
        lr.setPlans(eps);
        lr.setResultType(DataType.TUPLE);
        mro.mapPlan.addAsLeaf(lr);
        POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
        pkg.getPkgr().setKeyType(DataType.TUPLE);
        pkg.setNumInps(1);
        boolean[] inner = {false};
        pkg.getPkgr().setInner(inner);
        mro.reducePlan.add(pkg);

        mro.reducePlan.addAsLeaf(getPlainForEachOP(scope, nig));
    }

静态方法POForEach getPlainForEachOP

获取一个简单的 POForEach:
使用POForEach 生成 flat1

static public POForEach getPlainForEachOP(String scope, NodeIdGenerator nig)
    {
        List eps1 = new ArrayList();
        List flat1 = new ArrayList();
        PhysicalPlan ep1 = new PhysicalPlan();
        POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
        prj1.setResultType(DataType.TUPLE);
        prj1.setStar(false);
        prj1.setColumn(1);
        prj1.setOverloaded(true);
        ep1.add(prj1);
        eps1.add(ep1);
        flat1.add(true);
        POForEach fe = new POForEach(new OperatorKey(scope, nig
                .getNextNodeId(scope)), -1, eps1, flat1);
        fe.setResultType(DataType.BAG);
        return fe;
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/679586.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号