栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop源码分析(六)

Hadoop源码分析(六)

2021SC@SDUSC 研究内容简略介绍

上周我们完成了对org.apache.hadoop.mapreduce.Counters中核心代码的分析,本周将从org.apache.hadoop.mapreduce.ID开始继续往下分析。

org.apache.hadoop.mapreduce.ID源码分析
package org.apache.hadoop.mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.WritableComparable;


@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class ID implements WritableComparable {
  protected static final char SEPARATOR = '_';
  protected int id;

  
  public ID(int id) {
    this.id = id;
  }

  protected ID() {
  }

  
  public int getId() {
    return id;
  }

  @Override
  public String toString() {
    return String.valueOf(id);
  }

  @Override
  public int hashCode() {
    return id;
  }

  @Override
  public boolean equals(Object o) {
    if (this == o)
      return true;
    if(o == null)
      return false;
    if (o.getClass() == this.getClass()) {
      ID that = (ID) o;
      return this.id == that.id;
    }
    else
      return false;
  }

  
  public int compareTo(ID that) {
    return this.id - that.id;
  }

  public void readFields(DataInput in) throws IOException {
    this.id = in.readInt();
  }

  public void write(DataOutput out) throws IOException {
    out.writeInt(id);
  }
  
}

ID类的功能十分简单,即为通用标识符,内部将 id 存储为整数,作为JobID, TaskID和TaskAttemptID的超类。其中包含一个构造器,get()方法,以及判断id是否相等的equals()。

作为ID类的子类,JobID, TaskID和TaskAttemptID都十分相似,因此我们以JobID为例来进行分析。

org.apache.hadoop.mapreduce.JobID源码分析

我们首先观察一下JobID的源码:

@InterfaceAudience.Public
@InterfaceStability.Stable
public class JobID extends org.apache.hadoop.mapred.ID 
                   implements Comparable {
  public static final String JOB = "job";
  
  // Jobid regex for various tools and framework components
  public static final String JOBID_REGEX = 
    JOB + SEPARATOR + "[0-9]+" + SEPARATOR + "[0-9]+";
  
  private final Text jtIdentifier;
  
  protected static final NumberFormat idFormat = NumberFormat.getInstance();
  static {
    idFormat.setGroupingUsed(false);
    idFormat.setMinimumIntegerDigits(4);
  }
  
  
  public JobID(String jtIdentifier, int id) {
    super(id);
    this.jtIdentifier = new Text(jtIdentifier);
  }
  
  public JobID() { 
    jtIdentifier = new Text();
  }
  
  public String getJtIdentifier() {
    return jtIdentifier.toString();
  }
  
  @Override
  public boolean equals(Object o) {
    if (!super.equals(o))
      return false;

    JobID that = (JobID)o;
    return this.jtIdentifier.equals(that.jtIdentifier);
  }
  
  
  @Override
  public int compareTo(ID o) {
    JobID that = (JobID)o;
    int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
    if(jtComp == 0) {
      return this.id - that.id;
    }
    else return jtComp;
  }
  
  
  public StringBuilder appendTo(StringBuilder builder) {
    builder.append(SEPARATOR);
    builder.append(jtIdentifier);
    builder.append(SEPARATOR);
    builder.append(idFormat.format(id));
    return builder;
  }

  @Override
  public int hashCode() {
    return jtIdentifier.hashCode() + id;
  }

  @Override
  public String toString() {
    return appendTo(new StringBuilder(JOB)).toString();
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    super.readFields(in);
    this.jtIdentifier.readFields(in);
  }

  @Override
  public void write(DataOutput out) throws IOException {
    super.write(out);
    jtIdentifier.write(out);
  }
  
  
  public static JobID forName(String str) throws IllegalArgumentException {
    if(str == null)
      return null;
    try {
      String[] parts = str.split("_");
      if(parts.length == 3) {
        if(parts[0].equals(JOB)) {
          return new org.apache.hadoop.mapred.JobID(parts[1], 
                                                    Integer.parseInt(parts[2]));
        }
      }
    }catch (Exception ex) {//fall below
    }
    throw new IllegalArgumentException("JobId string : " + str 
        + " is not properly formed");
  }
  
}

官网对该类的描述如下:
公共类JobID扩展了ID,同时实现Comparable < ID >。
JobID 表示作业的不可变和唯一标识符。JobID 由两部分组成。第一部分表示作业跟踪器标识符,因此定义了作业跟踪器映射的作业 ID。对于集群设置,此字符串是 jobtracker 开始时间,对于本地设置,它是“本地”和一个随机数。JobID 的第二部分是作业编号。

官方给出了一个实例:一个示例 JobID 是 : job_200707121733_0003,它表示在开始于 的作业跟踪器上运行的第三个作业200707121733。

应用程序不应构造或解析 JobID 字符串,而应使用适当的构造函数或forName(String)方法。

JobID中的基本方法有以下几个。

其中compareTo(),equals(),write()都是常见的方法。我们重点来看一下appendTo()。

appendTo()方法
public StringBuilder appendTo(StringBuilder builder) {
    builder.append(SEPARATOR);
    builder.append(jtIdentifier);
    builder.append(SEPARATOR);
    builder.append(idFormat.format(id));
    return builder;
  }

该方法传入的参数bulider为要附加到的构建器,附加之后,将该builder返回。
appendTo(StringBuilder builder) 能够将“job”前缀后的内容添加到给定的构建器。这样的添加很有必要,因为子 ID 在其字符串的开头使用此子字符串。

接下来我们继续分析其他类的源码。

org.apache.hadoop.mapreduce.InputFormat源码分析
package org.apache.hadoop.mapreduce;

import java.io.IOException;
import java.util.List;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputFormat {

  
  public abstract 
    List getSplits(JobContext context
                               ) throws IOException, InterruptedException;
  
  
  public abstract 
    RecordReader createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}


InputFormat是一个抽象类,用于获取Input输入数据,并将其切分和打成键值对。
InputFormat 描述 Map-Reduce 作业的输入规范。
Map-Reduce 框架依赖于InputFormat作业:

1.验证作业的输入规范。
2.将输入文件拆分为逻辑文件InputSplit,然后将每个文件分配给一个单独的Mapper.
3.提供RecordReader用于从逻辑中收集输入记录InputSplit以供Mapper.

基于文件的InputFormats(通常是的FileInputFormat子类)的默认行为是根据输入文件的总大小(以字节为单位)将输入FileInputFormat拆分为逻辑 InputSplits。但是,FileSystem输入文件的块大小被视为输入拆分的上限。分割大小的下限可以通过 mapreduce.input.fileinputformat.split.minsize设置 。

显然,基于输入大小的逻辑拆分对于许多应用程序来说是不够的,因为要遵守记录边界。在这种情况下,应用程序还必须实现RecordReader尊重记录边界的责任,并InputSplit为单个任务提供面向记录的逻辑视图。

这个类中有两个抽象方法:

(1)getSplits:负责将HDFS数据解析成InputSplit,也就是对原始数据进行切片,按照设定的切片大小进行逻辑切片;InputSplit只记录了切片的元数据信息,例如偏移量的起止位置、长度和所在节点列表等。

(2)createRecordReader:获取每个InputSplit,并且将其中的每一行解析成键值对。

对InputFormat的了解告一段落,下一周的博客中,我们将对InputFormat的几个重要子类,如FileInputFormat等展开详细的分析。

总结

本次我们首先分析了ID类,对ID类的作用有了初步的了解,然后开始了对其子类的分析,我们以JobID为例,探讨了它的源码,了解了其中的重点方法。同时我们阅读了InputFormat的源码,分析了他的功能,为之后分析其子类打下基础。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/467649.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号