栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop16:【面试题】InputFormat以及OutputFormat分析

Hadoop16:【面试题】InputFormat以及OutputFormat分析

一、InputFormat

Hadoop中有一个抽象类是InputFormat,InputFormat抽象类是MapReduce输入数据的顶层基类,这个抽象类中只定义了两个方法

一个是getSplits方法
另一个是createRecordReader方法

这个抽象类下面有三个子继承类

DBInputFormat是操作数据库的,
FileInputFormat是操作文件类型数据的,
DelegatingInputFormat是用在处理多个输入时使用的

这里面比较常见的也就是FileInputFormat了,FileInputFormat是所有以文件作为数据源的基类,FileInputFormat保存job输入的所有文件,并实现了对输入文件计算splits的方法,至于获得文件中数据的方法是由子类实现的。
FileInputFormat下面还有一些子类:

CombineFileInputFormat:处理小文件问题的,后面我们再详细分析
TextInputFormat:是默认的处理类,处理普通文本文件,他会把文件中每一行作为一个记录,将每一行的起始偏移量作为key,每一行的内容作为value,这里的key和value就是我们之前所说的k1,v1
它默认以换行符或回车键作为一行记录
NLineInputFormat:可以动态指定一次读取多少行数据
这里面的TextInputFormat是我们处理文本数据的默认处理类,TextInputFormat的顶层基类是InputFormat,下面我们先来看一下这个抽象类的源码
1、下载源码

下载hadoop的源码:

https://archive.apache.org/dist/hadoop/common/hadoop-3.2.0/hadoop-3.2.0-src.tar.gz
2、解压导入

解压到指定目录下(D:IdeaProjects),然后找到里面的hadoop-mapreduce-client-core子项目,我们要分析的内容都在这里面。
D:IdeaProjectshadoop-3.2.0-srchadoop-mapreduce-projecthadoop-mapreduce-clienthadoop-mapreduce-client-core

将这个项目导入到idea中,这是一个maven项目,导入进去之后项目会自动开始下载依赖包,一直等依赖包下载完成即可,但是这将会是一个漫长的过程,有可能下载一会进度条就一动不动了,这都是很正常的现象,建议大家修改maven官方中央仓库的地址,改为国内镜像地址,这样速度会很快,但是注意了,改为国内镜像地址以后,有可能会遇到个别依赖下载不下来的情况,这也是很正常的现象,
所以说,maven官方中央仓库的特点是:依赖最全,但是下载速度贼慢
国内镜像仓库的特点是:下载速度贼快,但是可能会缺失部分依赖,我是遇到过这种问题的。

分析下来发现都不靠谱,所以最终给大家一个折中方案,先使用国内镜像仓库下载,将能下载的依赖包全部都下载下来,如果不缺东西,那正好,如果发现部分依赖找不到,那就还切换回官网中央仓库。

如何将maven依赖的下载地址改为国内镜像地址呢?
很简单,直接修改本地安装好的maven的conf目录下的settings.xml配置文件即可。
找到settings.xml配置文件中的标签,在这个标签中添加国内镜像地址,
在这里我们使用阿里云的maven国内镜像地址


      alimaven
      aliyun maven
      http://maven.aliyun.com/nexus/content/groups/public/
      central        

修改好之后重启idea,让他重新加载项目的依赖,或者打开cmd命令行窗口,进入到hadoop-mapreduce-client-core这个子项目,执行编译命令,这样就会开始下载项目需要的依赖了。

其实我是倾向于在cmd命令行窗口中执行,这样能看到下载的详细进度信息,并且下载进程也不会卡死,使用idea编辑器自动下载依赖时,时间过长可能会导致下载进程卡死

C:Usersyehua>cd D:IdeaProjectshadoop-3.2.0-srchadoop-mapreduce-projecthadoop-mapreduce-clienthadoop-mapreduce-client-core

C:Usersyehua>d:

D:IdeaProjectshadoop-3.2.0-srchadoop-mapreduce-projecthadoop-mapreduce-clienthadoop-mapreduce-client-core>mvn clean compile-DskipTests
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.261s
[INFO] Finished at: Fri Apr 24 22:03:19 CST 2020
[INFO] Final Memory: 35M/518M
[INFO] ------------------------------------------------------------------------

最后看到BUILD SUCCESS就说明编译成功了,编译成功也也就意味着这个项目的依赖都下载成功了。

下面就可以在idea中重新打开这个项目了

注意:打开这个项目打开以后需要在项目的src/main/java目录上点击右键–>Mark Directory as–>Sources ROOT

如果不执行这个操作,后面在查看源码的时候按Ctrl+左键,不能进入类或方法里面


接下来我们找到项目中的InputFormat抽象类,这个抽象类中一共就两个方法

getSplit
createRecordReader

看getSplit的注释

Logically split the set of input files for the job.

逻辑切分Job中输入文件的集合,其实这个就是我们前面说的MapReduce会对输入数据进行逻辑切分,获取InputSplit
所以这个方法的作用就是把输入文件的集合切分为InputSplit的集合

List getSplits(JobContext context) throws IOException, InterruptedException;

接着来看一下createRecordReader的注释

Create a record reader for a given split. The framework will call
{@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
the split is used.

针对每一个InputSplit创建一个RecordReader,在split被使用之前,框架会先调用RecordReader的initialize初始化方法。

RecordReader createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;
3、getSplits方法的具体实现代码

接下来先详细分析一下getSplits方法的具体实现代码,在FileInputFormat中有具体实现。

public List getSplits(JobContext job) throws IOException {
  StopWatch sw = new StopWatch().start();
  //获取InputSplit的size的最小值minSize和最大值maxSize
  
  long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
  
  long maxSize = getMaxSplitSize(job);

  // 创建List,准备保存生成的InputSplit
  List splits = new ArrayList();
  //获取输入文件列表
  List files = listStatus(job);

  
  boolean ignoreDirs = !getInputDirRecursive(job)
    && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
  //迭代输入文件列表
  for (FileStatus file: files) {
    //是否忽略子目录,默认不忽略
    if (ignoreDirs && file.isDirectory()) {
      continue;
    }
    //获取 文件/目录 路径
    Path path = file.getPath();
    //获取 文件/目录 长度
    long length = file.getLen();
    if (length != 0) {
      //保存文件的Block块所在的位置
      BlockLocation[] blkLocations;
      if (file instanceof LocatedFileStatus) {
        blkLocations = ((LocatedFileStatus) file).getBlockLocations();
      } else {
        FileSystem fs = path.getFileSystem(job.getConfiguration());
        blkLocations = fs.getFileBlockLocations(file, 0, length);
      }
      //判断文件是否支持切割,默认为true
      if (isSplitable(job, path)) {
        //获取文件的Block大小,默认128M
        long blockSize = file.getBlockSize();
        //计算split的大小
        
        long splitSize = computeSplitSize(blockSize, minSize, maxSize);

        //还需要处理的文件剩余字节大小,其实就是这个文件的原始大小
        long bytesRemaining = length;
        //
        
        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          //组装InputSplit
          
          splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                      blkLocations[blkIndex].getHosts(),
                      blkLocations[blkIndex].getCachedHosts()));
          bytesRemaining -= splitSize;
        }

        //最后会把bytesRemaining/splitSize<=1.1的那一部分内容作为一个InputSplit
        if (bytesRemaining != 0) {
          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                     blkLocations[blkIndex].getHosts(),
                     blkLocations[blkIndex].getCachedHosts()));
        }
      } else { // not splitable
        //如果文件不支持切割,执行这里
        if (LOG.isDebugEnabled()) {
          // Log only if the file is big enough to be splitted
          if (length > Math.min(file.getBlockSize(), minSize)) {
            LOG.debug("File is not splittable so no parallelization "
                + "is possible: " + file.getPath());
          }
        }
        //把不支持切割的文件整个作为一个InputSplit
        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                    blkLocations[0].getCachedHosts()));
      }
    } else { 
      //Create empty hosts array for zero length files
      splits.add(makeSplit(path, 0, length, new String[0]));
    }
  }
  // Save the number of input files for metrics/loadgen
  job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
  sw.stop();
  if (LOG.isDebugEnabled()) {
    LOG.debug("Total # of splits generated by getSplits: " + splits.size()
        + ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS));
  }
  return splits;
}
4、面试题

根据刚才的分析,下面我们来看几个面试题?

(1)一个1G的文件,会产生多少个map任务?

Block块默认是128M,所以1G的文件会产生8个Block块
默认情况下InputSplit的大小和Block块的大小一致,每一个InputSplit会产生一个map任务
所以:1024/128=8个map任务

(2)1000个文件,每个文件100KB,会产生多少个map任务?

一个文件,不管再小,都会占用一个block,所以这1000个小文件会产生1000个Block,
那最终会产生1000个InputSplit,也就对应着会产生1000个map任务
一个140M的文件,会产生多少个map任务?
根据前面的分析

(3)140M的文件会产生2个Block,那对应的就会产生2个InputSplit了?

注意:这个有点特殊,140M/128M=1.09375<1.1
所以,这个文件只会产生一个InputSplit,也最终也就只会产生1个map 任务。
这个文件其实再稍微大1M就可以产生2个map 任务了。

5、验证测试

说一千、道一万 不如真刀真枪的干一场。
来实际操刀验证一下:

(1)生成测试数据

执行GenerateDat.java代码,会生成两个文件,一个140M,一个141M

GenerateDat.java代码如下:

package com.imooc.mr;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;


public class GenerateDat {
    public static void main(String[] args) throws Exception{
        generate_140M();
        generate_141M();
    }

    
    private static void generate_141M() throws IOException {
        String fileName = "D:\s_name_141.dat";
        System.out.println("start: 开始生成141M文件->"+fileName);
        BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
        int num = 0;
        while(num<8221592){
            bfw.write("zhangsan beijing");
            bfw.newline();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }
        System.out.println("end: 141M文件已生成");
    }

    
    private static void generate_140M() throws IOException {
        String fileName = "D:\s_name_140.dat";
        System.out.println("start: 开始生成140M文件->"+fileName);
        BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
        int num = 0;
        while(num<8201592){
            bfw.write("zhangsan beijing");
            bfw.newline();
            num ++;
            if(num%10000==0){
                bfw.flush();
            }
        }
        System.out.println("end: 140M文件已生成");
    }
}

start: 开始生成140M文件->D:s_name_140.dat
end: 140M文件已生成
start: 开始生成141M文件->D:s_name_141.dat
end: 141M文件已生成

(2)将数据上传到bigdata01机器的/data/soft目录下
[root@bigdata01 soft]# ll
.....
-rw-r--r--. 1 root root 147616384 Apr 24 23:33 s_name_140.dat
-rw-r--r--. 1 root root 147976384 Apr 24 23:33 s_name_141.dat
(3)将这两个文件上传到HDFS的根目录
[root@bigdata01 soft]# hdfs dfs -put s_name_140.dat  /
[root@bigdata01 soft]# hdfs dfs -put s_name_141.dat  / 
[root@bigdata01 soft]# hdfs dfs -ls /
........
-rw-r--r--   2 root supergroup  147616384 2020-04-24 23:37 /s_name_140.dat
-rw-r--r--   2 root supergroup  147976384 2020-04-24 23:37 /s_name_141.dat
(4)使用我们之前开发的WordCount代码分别计算着两个文件
注意,在使用WordCount代码的时候建议把代码中的日志输出信息注释掉,否则任务在执行过程中会打印过多的日志信息,额外占用linux本地磁盘空间
(5)计算140M文件
[root@bigdata01 soft]# cd hadoop-3.2.0
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /s_name_140.dat /out140

然后到yarn的8088界面查看,发现map任务只有一个

(6)再计算141M文件
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /s_name_141.dat /out141

然后到yarn的8088界面查看,发现map任务有两个

6、接下来看一下createRecordReader方法
createRecordReader的具体实现是在TextInputFormat类中

//针对每一个InputSplit都会创建一个RecordReader
@Override
public RecordReader 
  createRecordReader(InputSplit split,
                     TaskAttemptContext context) {
  //获取换行符,默认是没有配置的
  String delimiter = context.getConfiguration().get(
      "textinputformat.record.delimiter");
  byte[] recordDelimiterBytes = null;
  if (null != delimiter)
    recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
  //创建一个行阅读器
  return new LineRecordReader(recordDelimiterBytes);
}

在createRecordReader的最后会创建一个 LineRecordReader,所以接下来看一下这个类,前面我们说过了,框架会先调用这个阅读器的初始化方法initialize

//初始化方法
public void initialize(InputSplit genericSplit,
                       TaskAttemptContext context) throws IOException {
  //获取传过来的InputSplit,将InputSplit转换成子类FileSplit
  FileSplit split = (FileSplit) genericSplit;
  Configuration job = context.getConfiguration();
  //MAX_LINE_LENGTH对应的参数默认没有设置,所以会取Integer.MAX_VALUE
  this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
  //获取InputSplit的起始位置
  start = split.getStart();
  //获取InputSplit的结束位置
  end = start + split.getLength();
  //获取InputSplit的路径
  final Path file = split.getPath();

  // 打开文件,并且跳到InputSplit的起始位置
  final FileSystem fs = file.getFileSystem(job);
  fileIn = fs.open(file);

  // 获取文件的压缩信息
  CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
  //如果文件是压缩文件,则执行if中的语句
  if (null!=codec) {
    isCompressedInput = true;
    decompressor = CodecPool.getDecompressor(codec);
    if (codec instanceof SplittableCompressionCodec) {
      final SplitCompressionInputStream cIn =
        ((SplittableCompressionCodec)codec).createInputStream(
          fileIn, decompressor, start, end,
          SplittableCompressionCodec.READ_MODE.BYBLOCK);
      in = new CompressedSplitLineReader(cIn, job,
          this.recordDelimiterBytes);
      start = cIn.getAdjustedStart();
      end = cIn.getAdjustedEnd();
      filePosition = cIn;
    } else {
      if (start != 0) {
        // So we have a split that is only part of a file stored using
        // a Compression codec that cannot be split.
        throw new IOException("Cannot seek in " +
            codec.getClass().getSimpleName() + " compressed stream");
      }

      in = new SplitLineReader(codec.createInputStream(fileIn,
          decompressor), job, this.recordDelimiterBytes);
      filePosition = fileIn;
    }
  } else {
    //如果文件是未压缩文件(普通文件),则执行else中的语句
    //跳转到文件中的指定位置
    fileIn.seek(start);
    //针对未压缩文件,创建一个阅读器读取一行一行的数据
    in = new UncompressedSplitLineReader(
        fileIn, job, this.recordDelimiterBytes, split.getLength());
    filePosition = fileIn;
  }
  // If this is not the first split, we always throw away first record
  // because we always (except the last split) read one extra line in
  // next() method.
  
  //如果start不等于0,表示不是第一个InputSplit,所以就把start的值重置为第二行的起始位置
  if (start != 0) {
    start += in.readLine(new Text(), 0, maxBytesToConsume(start));
  }
  this.pos = start;
}

分析完初始化方法,接下来看一下这个阅读器中最重要的方法nextKeyValue()

public boolean nextKeyValue() throws IOException {
  if (key == null) {
    key = new LongWritable();
  }
  // k1 就是每一行的起始位置
  key.set(pos);
  if (value == null) {
    value = new Text();
  }
  int newSize = 0;
  // We always read one extra line, which lies outside the upper
  // split limit i.e. (end - 1)
  while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
    if (pos == 0) {
      newSize = skipUtfByteOrderMark();
    } else {
      //读取一行数据,赋值给value,也就是v1
      newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
      pos += newSize;
    }

    if ((newSize == 0) || (newSize < maxLineLength)) {
      break;
    }

    // line too long. try again
    LOG.info("Skipped line of size " + newSize + " at pos " + 
             (pos - newSize));
  }
  if (newSize == 0) {
    key = null;
    value = null;
    return false;
  } else {
    return true;
  }
}
二、OutputFormat

OutputFormat分析
前面我们分析了InputFormat,下面我们来分析一下OutputFormat,顾名思义,这个是控制MapReduce输出的。

OutputFormat是输出数据的顶层基类
FileOutputFormat:文件数据处理基类
TextOutputFormat:默认文本文件处理类

这几个其实和InputFormat中的那几个文本处理类是对应着的,当然了针对输出数据还有其它类型的处理类,我们在这先分析最常见的文本文件处理类,其他类型的等我们遇到具体场景再具体分析。

我们来看一下OutputFormat的源码,这个类主要由三个方法

getRecordWriter
checkOutputSpecs
getOutputCommitter

先看一下getRecordWriter这个方法的具体实现,在TextOutputFormat中

public RecordWriter 
       getRecordWriter(TaskAttemptContext job
                       ) throws IOException, InterruptedException {
  Configuration conf = job.getConfiguration();
  //任务的输出数据是否需要压缩,默认为false,也就是不压缩
  boolean isCompressed = getCompressOutput(job);
  //获取输出的key(k3)和value(v3)之间的分隔符,默认为t
  String keyValueSeparator= conf.get(SEPARATOR, "t");
  CompressionCodec codec = null;
  String extension = "";
  if (isCompressed) {
    Class codecClass = 
      getOutputCompressorClass(job, GzipCodec.class);
    codec = ReflectionUtils.newInstance(codecClass, conf);
    extension = codec.getDefaultExtension();
  }
  //获取输出文件路径信息
  Path file = getDefaultWorkFile(job, extension);
  FileSystem fs = file.getFileSystem(conf);
  //获取文件输出流
  FSDataOutputStream fileOut = fs.create(file, false);
  if (isCompressed) {
    return new LineRecordWriter<>(
        new DataOutputStream(codec.createOutputStream(fileOut)),
        keyValueSeparator);
  } else {
    //创建行书写器
    return new LineRecordWriter<>(fileOut, keyValueSeparator);
  }
}

最后会创建一个行书写器,具体看一下代码实现,它里面有一个核心方法write()

public synchronized void write(K key, V value)
  throws IOException {

  boolean nullKey = key == null || key instanceof NullWritable;
  boolean nullValue = value == null || value instanceof NullWritable;
  //如果key和value都是null 则直接返回
  if (nullKey && nullValue) {
    return;
  }
  //如果key不为null,则写入key
  if (!nullKey) {
    writeObject(key);
  }
  //key和value都不为null,这里面的条件才会成立,会写入key和value的分隔符
  //注意:视频中上面这一行的注释添加的有误,以这里为准
  if (!(nullKey || nullValue)) {
    out.write(keyValueSeparator);
  }
  //如果value不为null,则写入value
  if (!nullValue) {
    writeObject(value);
  }
  //写入换行符
  out.write(newline);
}

接着是checkOutputSpecs方法,这个方法的具体实现在FileOutputFormat中

public void checkOutputSpecs(JobContext job
                             ) throws FileAlreadyExistsException, IOException{
  // Ensure that the output directory is set and not already there
  //根据输出目录创建path
  Path outDir = getOutputPath(job);
  //如果获取不到,则说明任务没有设置输出目录,直接报错即可
  if (outDir == null) {
    throw new InvalidJobConfException("Output directory not set.");
  }

  // get delegation token for outDir's file system
  TokenCache.obtainTokensForNamenodes(job.getCredentials(),
      new Path[] { outDir }, job.getConfiguration());

  //判断输出目录是否已存在,如果存在则抛异常
  if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
    throw new FileAlreadyExistsException("Output directory " + outDir + 
                                         " already exists");
  }
}

最后看一下getOutputCommitter,这个方法的具体实现在FileOutputFormat中

//获取输出数据提交器,负责将数据写入到输出目录中
public synchronized
    OutputCommitter getOutputCommitter(TaskAttemptContext context)
    throws IOException {
  if (committer == null) {
    Path output = getOutputPath(context);
    committer = PathOutputCommitterFactory.getCommitterFactory(
        output,
        context.getConfiguration()).createOutputCommitter(output, context);
  }
  return committer;
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753764.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号