概述
本次继续分析pig作为hadoop的轻量级脚本语言操作hadoop的executionengine包下的MRUtil类的代码。
静态方法impleConnectMapToReduce
simpleConnectMapToReduce 是一个实用程序,用于在 mapreduce 运算符中结束映射阶段并启动 reduce 阶段。
其中,mro 仅包含地图计划,同时需要添加POLocalRearrange到结束地图计划,并添加POPackage开始一个减少计划 。
整个程序中的POLocalRearrange/POPackage是微不足道的。
static public void simpleConnectMapToReduce(MapReduceOper mro, String scope, NodeIdGenerator nig) throws PlanException
{
PhysicalPlan ep = new PhysicalPlan();
POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prjStar.setResultType(DataType.TUPLE);
prjStar.setStar(true);
ep.add(prjStar);
List eps = new ArrayList();
eps.add(ep);
POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
try {
lr.setIndex(0);
} catch (ExecException e) {
int errCode = 2058;
String msg = "Unable to set index on the newly created POLocalRearrange.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
lr.setKeyType(DataType.TUPLE);
lr.setPlans(eps);
lr.setResultType(DataType.TUPLE);
mro.mapPlan.addAsLeaf(lr);
POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
pkg.getPkgr().setKeyType(DataType.TUPLE);
pkg.setNumInps(1);
boolean[] inner = {false};
pkg.getPkgr().setInner(inner);
mro.reducePlan.add(pkg);
mro.reducePlan.addAsLeaf(getPlainForEachOP(scope, nig));
}
静态方法POForEach getPlainForEachOP
获取一个简单的 POForEach:
使用POForEach 生成 flat1
static public POForEach getPlainForEachOP(String scope, NodeIdGenerator nig)
{
List eps1 = new ArrayList();
List flat1 = new ArrayList();
PhysicalPlan ep1 = new PhysicalPlan();
POProject prj1 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
prj1.setResultType(DataType.TUPLE);
prj1.setStar(false);
prj1.setColumn(1);
prj1.setOverloaded(true);
ep1.add(prj1);
eps1.add(ep1);
flat1.add(true);
POForEach fe = new POForEach(new OperatorKey(scope, nig
.getNextNodeId(scope)), -1, eps1, flat1);
fe.setResultType(DataType.BAG);
return fe;
}



