MapReduce框架中有6个主要的通信协议,面向Client4个,MapReduce框架内部2个。
分别是:
- JobSubmissionProtocol:Client与JobTracker之间的通信协议。用户通过该协议提交作业,查看作业运行情况。
- RefreshUserMappingsProtocol:Client通过该协议更新用户-用户组映射关系。
- RefreshAuthorizationPolicyProtocol:Client通过该协议更新MapReduce服务级别访问控制列表。
- AdminOperationsProtocol:Client通过该协议更新队列(存在JobTracker或者Scheduler中)访问控制列表和节点列表。
- InterTrackerProtocol:TaskTracker与JobTracker之间的通信协议,TaskTracker通过相关接口汇报本节点的资源使用情况和任务运行状态等信息,并执行JobTracker发送的命令。
- TaskUmbilicalProtocol:Task与TaskTracker之间的通信协议,每个Task实际上是其同节点TaskTracker的子进程,他们通过该协议汇报Task运行状态,运行进度等信息。
所有Hadoop RPC的协议基类都是VersionedProtocol,该类主要用于描述协议版本号,以防止不同版本号的客户端与服务器端之间通信,除了最后一个与TaskTracker相关,上面五个都是与JobTracker相关的。
细讲通信协议我看的是hadoop-2.10的源码,发送在VersionedProtocol下找不到这几个协议,而找到了一个总的ClientProtocol。
同时书里的几个协议的内容,都集成到了这个ClientProtocol中了。
但是为了方便区分各个方法的作用,我还是按照书里来写。记住有这个变化就ok。
Client与JobTracker之间的通信协议,用户通过该协议提交作业和查看作业运行状态,该协议的接口分为三类
- 作业提交:
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException;
- 作业控制
public void killJob(JobID jobid) throws IOException, InterruptedException; public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException; public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, InterruptedException;
- 查看系统状态和作业运行状态
public JobStatus getJobStatus(JobID jobid) throws IOException, InterruptedException; public JobStatus[] getAllJobs() throws IOException, InterruptedException; public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException;InterTrackerProtocol通信协议
TaskTracker和JobTracker之间的通信协议,TaskTracker通过该协议向JobTracker汇报所在节点的资源使用情况和任务运行状况,并接收和执行JobTracker返回的命令。
协议中最重要的方法就是heatbeat。它周期性被调用,形成TaskTracker和JobTracker之间的心跳。
Task和TaskTracker之间通信的协议。每个Task通过这个协议 向对应的TaskTracker汇报自己的运行状况或者出错信息。
该协议中的方法分成两类:一类是周期性被调用的方法,另一类是按需调用的方法
第一类方法:两个
boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException; boolean ping(TaskAttemptID taskid) throws IOException;
Task每隔3s会调用一次statusUpdate函数向TaskTracker汇报最新进度,然而,如果Task在3s内没有处理任何数据,则不再汇报进度,而是直接调用ping方法探测TaskTracker,确保当前数据处理过程中它一直是活着的。
第二类方法:在Task的不同运行阶段被调用,调用时机为:
- Task初始化:TaskTracker从JobTracker接收到一个启动新Task的命令(LaunchTaskAction)后,创建一个子进程(child),并由该子进程调用getTask方法领取对应的Task。
- Task运行中:汇报错误与异常、汇报记录范围、获取Map Task完成列表
汇报异常
void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException; void fsError(TaskAttemptID taskId, String message) throws IOException; void fatalError(TaskAttemptID taskId, String message) throws IOException; void shuffleError(TaskAttemptID taskId, String message) throws IOException;
汇报记录范围:Hadoop可跳过坏记录提高程序的容错性,此方法方便定位坏记录的位置。
void reportNextRecordRange(TaskAttemptID taskid, SortedRanges.Range range)
throws IOException;
获取Map Task完成列表:Reduce Task和Map Task之间存在依赖关系。此方法方便Recude Task从其Task Tracker中获取已经完成的Map Task列表,进而能够获取Map Task产生的临时数据存放位置,并远程读取(对应Reduce Task 的Shuffle阶段)这些数据。
MapTaskCompletionEventsUpdate getMapCompletionEvents(JobID jobId,
int fromIndex,
int maxLocs,
TaskAttemptID id)
throws IOException;
}
- Task运行完成
当Task处理完最后一条记录后,会完成最后收尾工作。
void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) throws IOException, InterruptedException; boolean canCommit(TaskAttemptID taskid) throws IOException; void done(TaskAttemptID taskid) throws IOException;



