本静态方法生成根据 woker 和 conf,得到 Task。首先根据 worker 的 class,得到一个 Task,并且设置 worker。
public staticTaskFactory#get(Class)Task get(T work, HiveConf conf) { @SuppressWarnings("unchecked") Task ret = get((Class ) work.getClass()); ret.setWork(work); if (null != conf) { ret.setConf(conf); } return ret; }
本静态方法生成根据 Task,遍历 taskvec,如果 workerClass 和 Worker 的class 相等,则拿到 taskClass,进行创建实例。
@VisibleForTesting statictaskvecTask get(Class workClass) { for (TaskTuple extends Serializable> t : taskvec) { if (t.workClass == workClass) { try { Task ret = (Task ) t.taskClass.newInstance(); ret.setId("Stage-" + Integer.toString(getAndIncrementId())); return ret; } catch (Exception e) { throw new RuntimeException(e); } } } throw new RuntimeException("No task for work class " + workClass.getName()); }
存储了所有 Worker class 和 Task Class 的映射。
public static ArrayList> taskvec; static { taskvec = new ArrayList >(); taskvec.add(new TaskTuple (MoveWork.class, MoveTask.class)); taskvec.add(new TaskTuple (FetchWork.class, FetchTask.class)); taskvec.add(new TaskTuple (CopyWork.class, CopyTask.class)); taskvec.add(new TaskTuple (ReplCopyWork.class, ReplCopyTask.class)); taskvec.add(new TaskTuple (DDLWork.class, DDLTask.class)); taskvec.add(new TaskTuple ( MaterializedViewDesc.class, MaterializedViewTask.class)); taskvec.add(new TaskTuple (FunctionWork.class, FunctionTask.class)); taskvec .add(new TaskTuple (ExplainWork.class, ExplainTask.class)); taskvec .add(new TaskTuple (ExplainSQRewriteWork.class, ExplainSQRewriteTask.class)); taskvec.add(new TaskTuple (ConditionalWork.class, ConditionalTask.class)); taskvec.add(new TaskTuple (MapredWork.class, MapRedTask.class)); taskvec.add(new TaskTuple (MapredLocalWork.class, MapredLocalTask.class)); taskvec.add(new TaskTuple (StatsWork.class, StatsTask.class)); taskvec.add(new TaskTuple (ColumnStatsUpdateWork.class, ColumnStatsUpdateTask.class)); taskvec.add(new TaskTuple (MergeFileWork.class, MergeFileTask.class)); taskvec.add(new TaskTuple (DependencyCollectionWork.class, DependencyCollectionTask.class)); taskvec.add(new TaskTuple (TezWork.class, TezTask.class)); taskvec.add(new TaskTuple (SparkWork.class, SparkTask.class)); taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class)); taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class)); taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class)); taskvec.add(new TaskTuple (ExportWork.class, ExportTask.class)); taskvec.add(new TaskTuple (ReplTxnWork.class, ReplTxnTask.class)); }



