上周我们在学习了RecordReader的基础上分析了RecordWriter的源码,并对mapreduce中最关键的几个类之一Reducer进行了深入分析,了解了其函数以及工作流程,并结合之前学习的mapper分析了mapreduce的粗略过程。本次我们从org.apache.hadoop.mapreduce.TaskAttemptID开始,完成最后一部分的分析。
org.apache.hadoop.mapreduce.TaskAttemptID源码分析首先附上TaskAttemptID源码:
public TaskAttemptID(TaskID taskId, int id) {
super(id);
if(taskId == null) {
throw new IllegalArgumentException("taskId cannot be null");
}
this.taskId = taskId;
}
public TaskAttemptID(String jtIdentifier, int jobId, TaskType type,
int taskId, int id) {
this(new TaskID(jtIdentifier, jobId, type, taskId), id);
}
@Deprecated
public TaskAttemptID(String jtIdentifier, int jobId, boolean isMap,
int taskId, int id) {
this(new TaskID(jtIdentifier, jobId, isMap, taskId), id);
}
public TaskAttemptID() {
taskId = new TaskID();
}
public JobID getJobID() {
return taskId.getJobID();
}
public TaskID getTaskID() {
return taskId;
}
@Deprecated
public boolean isMap() {
return taskId.isMap();
}
public TaskType getTaskType() {
return taskId.getTaskType();
}
@Override
public boolean equals(Object o) {
if (!super.equals(o))
return false;
TaskAttemptID that = (TaskAttemptID)o;
return this.taskId.equals(that.taskId);
}
protected StringBuilder appendTo(StringBuilder builder) {
return taskId.appendTo(builder).append(SEPARATOR).append(id);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
taskId.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
taskId.write(out);
}
@Override
public int hashCode() {
return taskId.hashCode() * 5 + id;
}
@Override
public int compareTo(ID o) {
TaskAttemptID that = (TaskAttemptID)o;
int tipComp = this.taskId.compareTo(that.taskId);
if(tipComp == 0) {
return this.id - that.id;
}
else return tipComp;
}
@Override
public String toString() {
return appendTo(new StringBuilder(ATTEMPT)).toString();
}
public static TaskAttemptID forName(String str
) throws IllegalArgumentException {
if(str == null)
return null;
String exceptionMsg = null;
try {
String[] parts = str.split(Character.toString(SEPARATOR));
if(parts.length == 6) {
if(parts[0].equals(ATTEMPT)) {
String type = parts[3];
TaskType t = TaskID.getTaskType(type.charAt(0));
if(t != null) {
return new org.apache.hadoop.mapred.TaskAttemptID
(parts[1],
Integer.parseInt(parts[2]),
t, Integer.parseInt(parts[4]),
Integer.parseInt(parts[5]));
} else
exceptionMsg = "Bad TaskType identifier. TaskAttemptId string : "
+ str + " is not properly formed.";
}
}
} catch (Exception ex) {
//fall below
}
if (exceptionMsg == null) {
exceptionMsg = "TaskAttemptId string : " + str
+ " is not properly formed";
}
throw new IllegalArgumentException(exceptionMsg);
}
}
官方对TaskAttemptID类的解释如下:
TaskAttemptID扩展了ID类。
TaskAttemptID 表示任务尝试的不可变和唯一标识符。每个任务尝试都是由其 TaskID 标识的 Map 或 Reduce 任务的一个特定实例。TaskAttemptID 由 2 部分组成。
第一部分是 TaskID这个 TaskAttemptID 所属的 。
第二部分是任务尝试次数。
一个示例 TaskAttemptID 是 :
attempt_200707121733_0003_m_000005_0,它表示在开始于 的作业跟踪器上运行的第三个作业中第五个映射任务的第零个任务尝试200707121733。
应用程序不应构造或解析 TaskAttemptID 字符串,而应使用适当的构造函数或forName(String) 方法。
TaskAttemptID的功能就是提供一个task attempt的ID,而一个task attempt是一个Map Task或Reduce Task实例。TaskAttemptID由两部分组成,第一部分是TaskID,第二部分是task attempt序号。TaskAttemptID的格式如下:
attempt_201108091551_0001_m_000000_0 或 attempt_local_0001_m_000000_0
attempt_201108091551_0001_m_000000_0表示2011年8月9日15时51分启动的JobTracker中第0001号作业的第000000号map task的第0号task attempt。我们分段来分析:
attempt:表示该标识代表task attempt;
201108091551:表示JobTracker启动时间;
0001:表示在JobTracker上运行的0001号job;
000000:如果紧跟后面的字母是“m”,表示000000号map task,如果紧跟后面的字母是“r”,则表示000000号reduce task;
0:表示0号task attempt;
第二种格式有点不一样,在第二个字段中不是JobTracker的启动日期,而是local,表示单机或伪集群模式运行,其他字段含义与第一种格式一样。
org.apache.hadoop.mapreduce.TaskAttemptContext源码分析分析了TaskAttemptID类后,我们现在来分析TaskAttemptContext类。由类名可以知道,该类是提供task attempt的相关信息。
我们知道,用户向Hadoop提交Job(作业),Job在JobTracker对象的控制下执行。Job不是独立完成的,Job提交后,Hadoop根据集群的规模将Job分解为若干个Task(任务),然后分发到集群中,在TaskTracker的控制下运行。Task包括Map Task和Reduce Task,是MapReduce的Map操作和Reduce操作的地方。
getTaskAttemptID:获取task attempt的ID,即类似attempt_local_0001_m_000000_0; setStatus:设置task当前的状态; getStatus:获取task当前的状态; progress:报告进度,这是个空方法,子类会通过重写这份方法来进行相关操作,如使用StatusReporter报告进度;
TaskInputOutputContext继承TaskAttemptContext,重写了setStatus方法和progress方法。在TaskInputOutputContext的这两个方法中,都使用StatusReporter对象。
可见,继承TaskAttemptContext的子类主要是重写这两个方法来完成工作的。
分析了TaskAttemptID与TaskAttemptContext之后,我们来看看TaskAttempt在mapreduce中的实际作用。
我们知道,MapReduce有三层调度模型,即Job——>Task——>TaskAttempt,并且:
1、通常一个Job存在多个Task,这些Task总共有Map Task和Redcue Task两种大的类型(为简化描述,Map-Only作业、JobSetup Task等复杂的情况这里不做考虑);
2、每个Task可以尝试运行1-n此,而且通常很多情况下都是1次,只有当开启了推测执行原理且存在拖后腿Task,或者Task之前执行失败时,Task才执行多次。
而TaskImpl中存在一个成员变量attempts,用来存储Task所包含TaskAttempt中TaskAttemptId与TaskAttempt的映射关系,定义及初始化如下:
private Mapattempts; this.attempts = Collections.emptyMap();
也就是说,attempts一开始被初始化为Collections.emptyMap(),我们看下其实现:
@SuppressWarnings("unchecked")
public static final Map emptyMap() {
return (Map) EMPTY_MAP;
}
@SuppressWarnings("unchecked")
public static final Map EMPTY_MAP = new EmptyMap<>();
private static class EmptyMap
extends AbstractMap
implements Serializable
{
private static final long serialVersionUID = 6428348081105594320L;
public int size() {return 0;}
public boolean isEmpty() {return true;}
public boolean containsKey(Object key) {return false;}
public boolean containsValue(Object value) {return false;}
public V get(Object key) {return null;}
public Set keySet() {return emptySet();}
public Collection values() {return emptySet();}
public Set> entrySet() {return emptySet();}
public boolean equals(Object o) {
return (o instanceof Map) && ((Map,?>)o).isEmpty();
}
public int hashCode() {return 0;}
// Preserves singleton property
private Object readResolve() {
return EMPTY_MAP;
}
}
可以看出,EmptyMap就是一个空的Map,大小为0,isEmpty为true,containsKey和containsValue等针对任何key或value均为false。
而在生成TaskAttempt后将其添加至attempts的逻辑如下:
// 将创建的任务运行尝试TaskAttemptImpl实例attempt与其ID的对应关系添加到TaskImpl的任务运行尝试集合attempts中,
// attempts先被初始化为Collections.emptyMap()
// this.attempts = Collections.emptyMap();
switch (attempts.size()) {
case 0:
// 如果attempts大小为0,即为Collections.emptyMap(),则将其更换为Collections.singletonMap(),并加入该TaskAttemptImpl实例attempt
attempts = Collections.singletonMap(attempt.getID(),
(TaskAttempt) attempt);
break;
case 1:
// 如果attempts大小为1,即为Collections.singletonMap(),则将其替换为linkedHashMap,并加入之前和现在的TaskAttemptImpl实例attempt
Map newAttempts
= new linkedHashMap(maxAttempts);
newAttempts.putAll(attempts);
attempts = newAttempts;
attempts.put(attempt.getID(), attempt);
break;
default:
// 如果attempts大小大于1,说明其实一个linkedHashMap,直接put吧
attempts.put(attempt.getID(), attempt);
break;
}
当Task第一次生成TaskAttempt,并将其加入attempts时,attempts为Collections.emptyMap(),其大小肯定为0,此时将TaskAttempt加入attempts时,会将attempts转换成Collections.singletonMap,即只含有一个Key-Value对的Map。而Collections.singletonMap定义如下:
public staticMap singletonMap(K key, V value) { return new SingletonMap<>(key, value); }
private static class SingletonMapextends AbstractMap implements Serializable { private static final long serialVersionUID = -6979724477215052911L; private final K k; private final V v; SingletonMap(K key, V value) { k = key; v = value; } public int size() {return 1;} public boolean isEmpty() {return false;} public boolean containsKey(Object key) {return eq(key, k);} public boolean containsValue(Object value) {return eq(value, v);} public V get(Object key) {return (eq(key, k) ? v : null);} private transient Set keySet = null; private transient Set > entrySet = null; private transient Collection values = null; public Set keySet() { if (keySet==null) keySet = singleton(k); return keySet; } public Set > entrySet() { if (entrySet==null) entrySet = Collections. >singleton( new SimpleImmutableEntry<>(k, v)); return entrySet; } public Collection values() { if (values==null) values = singleton(v); return values; } }
由此可以看出,SingletonMap是只包含一对Key-Value的Map,其size大小固定为1,containsKey和containsValue返回入参key、value是否与SingletonMap内部的k、v相等,get会根据入参是否为k,来确定返回v还是null,等等。
而当attempts大小为1,即为Collections.singletonMap时,再添加TaskAttempt的话,就需要将attempts更换为linkedHashMap,将之前的和新添加的TaskAttempt加入,此后,如果再有TaskAttempt要加入的话,直接put即可。linkedHashMap初始化时,其容量已被确定,为maxAttempts,这个maxAttempts取自方法getMaxAttempts(),它在TaskImpl中是一个抽象方法,由其两个子类MapTaskImpl、ReduceTaskImpl分别实现,如下:
TaskImpl.java
// No override of this method may require that the subclass be initialized. protected abstract int getMaxAttempts();
MapTaskImpl.java
@Override
protected int getMaxAttempts() {
return conf.getInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4);
}
ReduceTaskImpl.java
@Override
protected int getMaxAttempts() {
return conf.getInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);
}
可见,Map和Reduce任务的TaskAttempt都有一个限制,分别取自参数mapreduce.map.maxattempts、mapreduce.reduce.maxattempts,参数未配置的话,均默认为4。既然有了TaskAttempt个数的上限,那么我们初始化linkedHashMap指定容量即可,其构造如下:
public linkedHashMap(int initialCapacity) {
super(initialCapacity);
accessOrder = false;
}
确定其初始容量为指定的initialCapacity。
MapReduce为什么要这么设计呢?我想了想,大体有关于业务逻辑和性能等方面的两个原因:
1、Task的调度执行是有顺序的,而Task的抽象类TaskImpl的实现类,无论是MapTaskImpl,还是ReduceTaskImpl的构造,都是必须先进行的,这样就有一个问题,如果attempts上来就被构造为指定大小的linkedHashMap,势必会造成空间的浪费,还有性能的消耗,况且,作业执行成功与否,还是后话,而如果我们初始化为Collections.emptyMap(),则很容易解决上面两个问题;
2、按照常理来说,理想情况下,每个Task应该有且只有一个TaskAttempt,只有当任务运行失败后重试,或开启推测执行机制后为有效加快拖后腿任务的执行而开启的备份任务等情况时,才会存在多个TaskAttempt,而在第一个TaskAttempt被构造时,将attempts由Collections.emptyMap()升级为Collections.singletonMap(),无论是在空间利用、性能上,还是业务逻辑上,都比较贴合实际情况;
3、再需要重试任务或开启备份任务时,才将attempts由Collections.singletonMap()升级为指定容量的linkedHashMap,里面有延迟加载的理念;
4、占用资源越少,性能越高,对于其他作业或任务来说,能够整体提高集群的资源利用效率。
总结本次我们从TaskAttemptID开始,逐步分析了其相关类TaskAttemptContext,以及TaskAttempt在mapreduce中的实际使用。对MapReduce的三层调度模型,即Job——>Task——>TaskAttempt有了更加完整的了解。至此我们通过十三篇博客,完成了对Hadoop中的MapReduce核心源码分析。



