栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

007 Hadoop之MapReduce深入

007 Hadoop之MapReduce深入

1、MapReduce框架原理

MapTask的sort:快速排序(内存) + 归并排序(磁盘)
ReduceTask的sort:归并排序(磁盘 or 内存)

MapTask.java

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    .....
}

ReduceTask.java

@Override
  @SuppressWarnings("unchecked")
  public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

    if (isMapOrReduce()) {
      copyPhase = getProgress().addPhase("copy");
      // 归并排序
      sortPhase  = getProgress().addPhase("sort"); 
      reducePhase = getProgress().addPhase("reduce");
    }
    ......
  }
1.1、InputFormat 1.1.1、切片
  • 从文件的逻辑上的进行大小的切分,一个切片就会产生一个MapTask;
  • 切片时只考虑文件本身,不考虑数据的整体集;
  • 切片大小和切块大小默认是一致的,为了避免将来切片读取数据的时候有跨机器的情况;
1.1.2、InputFormat的体系结构
  • FileInputFormat是InputFormat的子实现类,实现切片逻辑(getSplits() 负责切片);
  • TextInputFormat是FileInputFormat的子实现类, 实现读取数据的逻辑(createRecordReader() 返回一个RecordReader,在RecordReader中实现了读取数据的方式:按行读取);
  • CombineFileInputFormat是FileInputFormat的子实现类,此类中也实现了一套切片逻辑 (适用于小文件计算场景);

PS:Hadoop中InputFormat默认的实现逻辑FileInputFormat、TextInputFormat

1.1.3、 源码考证TextInputFormat是hadoop的InputFormat默认实现类

job.java

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
}

public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    return submitter.submitJobInternal(Job.this, cluster);
}

JobSubmitter.java

JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    int maps = writeSplits(job, submitJobDir);
}

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    maps = writeNewSplits(job, jobSubmitDir);
}

int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    InputFormat input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
  }

ChainMapContextImpl.java

@Override
  public Class> getInputFormatClass()
      throws ClassNotFoundException {
    return base.getInputFormatClass();
  }

JobContextImpl.java

public Class> getInputFormatClass() 
     throws ClassNotFoundException {
    return (Class>) 
    // 找到了默认实现类TextInputFormat
      conf.getClass(INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
  }
1.1.4、FileInputFormat.java(实现数据切片)

FileInputFormat.java

public List getSplits(JobContext job) throws IOException {
	// 创建一个计时器
    StopWatch sw = new StopWatch().start(); 
    // 值为1(默认情况,mapred-site.xml有配置)
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    // 值为long最大值(默认情况,mapred-site.xml没配置)
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List splits = new ArrayList();
    List files = listStatus(job);
	
	// 没有配置mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs,ignoreDirs = false
    boolean ignoreDirs = !getInputDirRecursive(job)
      && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
      
    for (FileStatus file: files) {
      if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      Path path = file.getPath();
      long length = file.getLen();
      // 如果不是空文件 继续切片
      if (length != 0) {
      	// 获取文件的具体的块信息
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        // 判断是否要进行切片(主要判断当前文件是否是压缩文件,有一些压缩文件时不能够进行切片)
        if (isSplitable(job, path)) {
          // 获取HDFS中的数据块的大小 128MB
          long blockSize = file.getBlockSize();
          // 计算切片的大小--> 128M 默认情况下永远都是块大小
          // 通过改变mapreduce.input.fileinputformat.split.minsize 配置项来改变minSize大小
          // 通过改变mapreduce.input.fileinputformat.split.maxsize 配置项来改变maxSize大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          // private static final double SPLIT_SLOP = 1.1;   // 10% slop
          // 防止很小的数据占用一个MapTask
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }
		 
		  // 处理剩余的数据(数据量大于 splitSize * 0.1) 
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
  }
1.1.5、TextInputFormat.java(实现数据读取)

TextInputFormat.java

// RecordReader和Mapper的输入类型一致
public RecordReader 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    // 按行读取数据
    return new LineRecordReader(recordDelimiterBytes);
  }

CombineFileInputFormat.java(适用于小文件计算场景)
不特殊处理,每一个小文件创建一个MapTask,非常损耗计算机资源

public List getSplits(JobContext job) 
    throws IOException {
    long minSizeNode = 0;
    long minSizeRack = 0;
    long maxSize = 0;
    Configuration conf = job.getConfiguration();

    // the values specified by setxxxSplitSize() takes precedence over the
    // values that might have been specified in the config
    if (minSplitSizeNode != 0) {
      minSizeNode = minSplitSizeNode;
    } else {
      minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
    }
    if (minSplitSizeRack != 0) {
      minSizeRack = minSplitSizeRack;
    } else {
      minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
    }
    if (maxSplitSize != 0) {
      maxSize = maxSplitSize;
    } else {
      maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
      // If maxSize is not configured, a single split will be generated per
      // node.
    }
    if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
      throw new IOException("Minimum split size pernode " + minSizeNode +
                            " cannot be larger than maximum split size " +
                            maxSize);
    }
    if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
      throw new IOException("Minimum split size per rack " + minSizeRack +
                            " cannot be larger than maximum split size " +
                            maxSize);
    }
    if (minSizeRack != 0 && minSizeNode > minSizeRack) {
      throw new IOException("Minimum split size per node " + minSizeNode +
                            " cannot be larger than minimum split " +
                            "size per rack " + minSizeRack);
    }

    // all the files in input set
    List stats = listStatus(job);
    List splits = new ArrayList();
    if (stats.size() == 0) {
      return splits;    
    }

    // In one single iteration, process all the paths in a single pool.
    // Processing one pool at a time ensures that a split contains paths
    // from a single pool only.
    for (MultiPathFilter onepool : pools) {
      ArrayList myPaths = new ArrayList();
      
      // pick one input path. If it matches all the filters in a pool,
      // add it to the output set
      for (Iterator iter = stats.iterator(); iter.hasNext();) {
        FileStatus p = iter.next();
        if (onepool.accept(p.getPath())) {
          myPaths.add(p); // add it to my output set
          iter.remove();
        }
      }
      // create splits for all files in this pool.
      getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
    }

    // create splits for all files that are not in any pool.
    getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);

    // free up rackTonodes map
    rackToNodes.clear();
    return splits;    
  }

Driver:设定InputFormat的实现类,以下两种方式设置的是同一个Configuration对象

// 指定CombineTextInputFormat中的切片最大值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304*5);
// 指定InputFormat的具体实现 conf.setClass(INPUT_FORMAT_CLASS_ATTR, cls, InputFormat.class);
job.setInputFormatClass(CombineTextInputFormat.class);

// 等价方式指定InputFormat的实现类
Configuration conf = new Configuration();
conf.set("mapreduce.job.inputformat.class","org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat");
1.2、shuffle机制

1.2.1、map task端操作
  • 每个map task都有一个内存缓冲区(默认是100MB),存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。
  • Spill过程:这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。整个缓冲区有个溢写的比例spill.percent(默认是0.8),当达到阀值时map task 可以继续往剩余的memory写,同时溢写线程锁定已用memory,先对key(序列化的字节)做排序,如果client程序设置了Combiner,那么在溢写的过程中就会进行局部聚合。
  • Merge过程:每次溢写都会生成一个临时文件,在map task真正完成时会将这些文件归并成一个文件,这个过程叫做Merge。
1.2.2、reduce task端操作
  • 当某台TaskTracker上的所有map task执行完成,对应节点的reduce task开始启动,简单地说,此阶段就是不断地拉取(Fetcher)每个map task所在节点的最终结果,然后不断地做merge形成reduce task的输入文件。
  • Copy过程:Reduce进程启动一些数据copy线程(Fetcher)通过HTTP协议拉取TaskTracker的map阶段输出文件
  • Merge过程:Copy过来的数据会先放入内存缓冲区(基于JVM的heap size设置),如果内存缓冲区不足也会发生map task的spill(sort 默认,combine 可选),多个溢写文件时会发生map task的merge

PS:分区、排序、合并和压缩可以控制,其他的步骤用默认的就好

1.2.3、分区数设定

Driver:设置分区数(默认是一个分区)

// 声明配置对象
Configuration conf = new Configuration();
// 声明Job对象
Job job = Job.getInstance(conf);
// 指定ReduceTask的数量
job.setNumReduceTasks(2);
源码:默认情况如何进行数据的分区划分

WordCountMapper.java(自定义的Mapper)

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    // 获取当前输入的数据
    String line = value.toString();
    // 切割数据
    String[] datas = line.split(" ");
    // 遍历集合 封装 输出数据的key和value
    for (String data : datas) {
        outk.set(data);
        context.write(outk, outv);
    }
}

TaskInputOutputContext.java(数据输出对象的接口)

public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException;

ChainMapContextImpl.java(实现数据输出接口的对象)

 public void write(KEYOUT key, VALUEOUT value) throws IOException,
     InterruptedException {
   output.write(key, value);
 }

MapTask.NewOutputCollector.java(环形缓冲区,分区数就是ReduceTask的数量)

public void write(K key, V value) throws IOException, InterruptedException {
  // partitions = jobContext.getNumReduceTasks();
  collector.collect(key, value, partitioner.getPartition(key, value, partitions));
}

Partitioner.java(负责Map阶段输出数据的分区)

public abstract int getPartition(KEY key, VALUE value, int numPartitions);

HashPartitioner.java(默认分区实现)

public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
1.2.4、自定义分区对象

PhonePartitioner.java(对电话号进行分区)

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;


// Partitioner和Mapper输出KV类型一致
public class PhonePartitioner extends Partitioner {
    
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {

        int phonePartitions;
        // 获取手机号
        String phoneNum = text.toString();
        if(phoneNum.startsWith("136")){
            phonePartitions = 0;
        }else if(phoneNum.startsWith("137")){
            phonePartitions = 1;
        }else if(phoneNum.startsWith("138")){
            phonePartitions = 2;
        }else if(phoneNum.startsWith("139")){
            phonePartitions = 3;
        }else {
            phonePartitions =4;
        }
        return phonePartitions;
    }
}

Driver:设置分区数和分区对象

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setNumReduceTasks(5);
job.setPartitionerClass(PhonePartitioner.class);

// 等价方式指定InputFormat的实现类
Configuration conf = new Configuration();
conf.set("mapreduce.job.partitioner.class","com.atguigu.mr.partitioner.PhonePartitioner");

PS:不同分区数产生的不同运行结果
– 当ReduceTask的数量设置 > 实际用到的分区数 此时会生成空的分区文件
– 当ReduceTask的数量设置 < 实际用到的分区数 此时会报错
– 当ReduceTask的数量设置 = 1 结果文件会输出到一个文件中,分析源码:MapTask.NewOutputCollector.java

NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                   JobConf job,
                   TaskUmbilicalProtocol umbilical,
                   TaskReporter reporter
                   ) throws IOException, ClassNotFoundException {
  collector = createSortingCollector(job, reporter);
  partitions = jobContext.getNumReduceTasks();
  // 判断ReduceTask的数量是否大于1
  if (partitions > 1) {
    // 当ReduceTask的数量设置 > 实际用到的分区数时,通过反射调用的自定义Partitioner or 默认Partitioner
    partitioner = (org.apache.hadoop.mapreduce.Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
  } else {
    // Partitioner接口是函数式接口,实现函数式接口创建匿名内部类(特殊的Partitioner实现)
    partitioner = new org.apache.hadoop.mapreduce.Partitioner() {
      @Override
      public int getPartition(K key, V value, int numPartitions) {
        // 1 - 1 = 0,Map结果文件会永远输入到一个文件中
        return partitions - 1; 
      }
    };
  }
}

@Override
public void write(K key, V value) throws IOException, InterruptedException {
  collector.collect(key, value,
                    partitioner.getPartition(key, value, partitions));
}
1.2.5、WritableComparable排序

hadoop3.x MapReduce序列化机制-自定义序列化类(Writable、WritableComparable)

  • 部分排序:每个输出文件内部有序;
  • 全排序:只有一个输出文件,只设置一个ReduceTask;
  • 辅助排序:弱化了,不重点掌握,比较key为bean时;
  • 二次排序:compareTo中的比较条件有两个;
1.2.6、Combiner合并

Hadoop3.x MapReduce Combiner 合并

  • Combiner用于减小Mapper和Reducer之间数据传输量
  • Combiner的父类就是Reducer
  • Combiner在每一个MapTask所在的节点运行
  • Combiner的使用不能影响最终的逻辑
1.3、OutputFormat

OutputFormat.java

public abstract RecordWriter 
    getRecordWriter(TaskAttemptContext context
                    ) throws IOException, InterruptedException;

public abstract void checkOutputSpecs(JobContext context
                                        ) throws IOException, 
                                                 InterruptedException;

OutputFormat定义最终数据的写出需要的方法
子类FileOutputFormat实现了抽象方法checkOutputSpecs:检查目标文件夹是否设置、是否存在
子类TextOutputFormat实现了抽象方法getRecordWriter:为获得RecordWriter,用于job的输出
Hadoop 自定义OutputFormat
FSDataOutputStream (浅析hadoop写入数据api)
HDFS客户端文件写操作—FSDataOutputStream的写操作01
Hadoop学习(六)—使用IOUtils对文件的上传和下载
hadoop(四)-hadoop的 inputformat、outputformat、recordreader、recordwriter

1.4、Join

注意hadoop和java堆内存分配和使用的区别
hadoop–Reduce Join
Hadoop|Reduce中Iterable迭代器K,V对象复用机制
Hadoop中 Map Join与计数器

Mapper中获取切片对象

@Override
protected void setup(Context context) throws IOException, InterruptedException {
   inputSplit = (FileSplit) context.getInputSplit();
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/674148.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号