栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Mapreduce执行机制之提交任务和切片原理

Mapreduce执行机制之提交任务和切片原理

1、Mapper 类

 * Maps input key/value pairs to a set of intermediate key/value pairs.  
 * 
 * 

Maps are the individual tasks which transform input records into a * intermediate records. The transformed intermediate records need not be of * the same type as the input records. A given input pair may map to zero or * many output pairs.

* *

The Hadoop Map-Reduce framework spawns one map task for each * {@link InputSplit} generated by the {@link InputFormat} for the job. * Mapper implementations can access the {@link Configuration} for * the job via the {@link JobContext#getConfiguration()}. * *

The framework first calls * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by * {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)} * for each key/value pair in the InputSplit. Finally * {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)} is called.

* *

All intermediate values associated with a given output key are * subsequently grouped by the framework, and passed to a {@link Reducer} to * determine the final output. Users can control the sorting and grouping by * specifying two key {@link RawComparator} classes.

* *

The Mapper outputs are partitioned per * Reducer. Users can control which keys (and hence records) go to * which Reducer by implementing a custom {@link Partitioner}. * *

Users can optionally specify a combiner, via * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the * intermediate outputs, which helps to cut down the amount of data transferred * from the Mapper to the Reducer. * *

Applications can specify if and how the intermediate * outputs are to be compressed and which {@link CompressionCodec}s are to be * used via the Configuration.

* *

If the job has zero * reduces then the output of the Mapper is directly written * to the {@link OutputFormat} without sorting by keys.

* *

Example:

*

 * public class TokenCounterMapper 
 *     extends Mapper<Object, Text, Text, IntWritable>{
 *    
 *   private final static IntWritable one = new IntWritable(1);
 *   private Text word = new Text();
 *   
 *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
 *     StringTokenizer itr = new StringTokenizer(value.toString());
 *     while (itr.hasMoreTokens()) {
 *       word.set(itr.nextToken());
 *       context.write(word, one);
 *     }
 *   }
 * }
 * 
* *

Applications may override the * {@link #run(org.apache.hadoop.mapreduce.Mapper.Context)} method to exert * greater control on map processing e.g. multi-threaded Mappers * etc.

将输入键/值对映射到一组中间键/值对。

map是将输入记录转换为a 中间记录。 转换后的中间记录不必是 与输入记录的类型相同。 给定的输入对可以映射为零或

. txt / /输出> * *

Hadoop map - reduce框架为每个映射生成一个映射任务 * {@link InputFormat}为作业生成的{@link InputSplit}。 * Mapper实现可以访问{@link Configuration} *该任务通过{@link JobContext#getConfiguration()}。 ** mapper调用流程 *

框架首先调用 * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)},然后是 * {@link #map(Object, Object, org.apache.hadoop.mapreduce.Mapper.Context)} *为InputSplit. value >中的每个键/值对。 最后 * {@link #cleanup(org.apache.hadoop.mapreduce.Mapper.Context)}被调用 * *

所有与给定输出键相关联的中间值是 *随后被框架分组,并传递给{@link Reducer} *确定最终输出。 用户可以通过控制排序和分组

. *指定两个键{@link RawComparator}类 ** 1、Partioner分区 *

Mapper output are partitioned per . /

Mapper output * <代码>减速器> < /代码。 用户可以控制去哪个键(以及记录) Reducer通过实现一个自定义的{@link Partitioner}。 *** 2、数据预合并Combiner *

用户可以选择指定一个合成器,通过 * {@link Job#setCombinerClass(Class)},执行局部聚合 *中间输出,有助于减少传输的数据量 从Mapper到Reducer. * ** 3、压缩Compression *

应用程序可以指定是否以及如何使用中间体 *输出将被压缩,哪个{@link CompressionCodec}将被压缩 *通过Configuration.

. Configuration

Mapper核心调用

Reducer 类

* Reduces a set of intermediate values which share a key to a smaller set of
 * values.  
 * 
 * 

Reducer implementations * can access the {@link Configuration} for the job via the * {@link JobContext#getConfiguration()} method.

*

Reducer has 3 primary phases:

*
    *
  1. * * Shuffle * *

    The Reducer copies the sorted output from each * {@link Mapper} using HTTP across the network.

    *
  2. * *
  3. * Sort * *

    The framework merge sorts Reducer inputs by * keys * (since different Mappers may have output the same key).

    * *

    The shuffle and sort phases occur simultaneously i.e. while outputs are * being fetched they are merged.

    * * SecondarySort * *

    To achieve a secondary sort on the values returned by the value * iterator, the application should extend the key with the secondary * key and define a grouping comparator. The keys will be sorted using the * entire key, but will be grouped using the grouping comparator to decide * which keys and values are sent in the same call to reduce.The grouping * comparator is specified via * {@link Job#setGroupingComparatorClass(Class)}. The sort order is * controlled by * {@link Job#setSortComparatorClass(Class)}.

    *将一组共享密钥的中间值减少到更小的一组 *值。 * * < p > <代码>减速器> < /代码的实现 . *可以访问任务的{@link配置} * {@link JobContext#getConfiguration()}方法

    Reducer有3个主要阶段:

    * < ol > 李* < > * 改组* < b id = "洗牌" > < / b > * *

    Reducer从每个 * {@link Mapper}使用HTTP跨网络 李* < / > * 李* < > * < b id = "排序" > < / b >排序 * *

    框架合并排序<代码>Reducer输入 * <代码>关键代码> < / s *(因为不同的Mappers可能输出相同的键) * *

    shuffle和sort阶段同时发生,即当输出是

    . txt >

    . txt * * < b id = " SecondarySort " > SecondarySort < / b > * *

    对返回的值进行二级排序 *迭代器时,应用程序应该使用secondary扩展键 键并定义一个分组比较器。 索引键将被排序 *整个键,但将使用分组比较器分组决定 在同一个reduce调用中发送哪些键和值。 分组 *比较器通过 * {@link工作# setGroupingComparatorClass(类)}。 排序顺序是 *控制 * {@link工作# setSortComparatorClass(类)}。< / p >

Reducer核心调用

大致处理流程

提交任务执行流程









通过比较来确定ClientProtocol



以下截图为 submitJobInternal内容

1、上传jar到集群的临时目录







将这些jar,文件,achieve上传到上面的临时路径

我这里呢,没有jar,achieve所以这里是空的

2、根据job对文件进行逻辑分片


重点1:使用 哪种 InputFormat 进行数据读取


public List getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    
    ** 确定minSize = 1 , maxSize=long最大值
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    List splits = new ArrayList();
    List files = listStatus(job);

 	。。。

	** 遍历输入路径的所有文件
    for (FileStatus file: files) {
    
      if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        
		
		** 判断文件是否可分割
        if (isSplitable(job, path)) {
          ** 获取默认blockSize = 32M,集群默认128M,集群可配
          long blockSize = file.getBlockSize();
          ** 通过blockSize,minSize,maxSize计算分片大小
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

		  ** 文件字节数
          long bytesRemaining = length;
          **  文件字节数/32M > 1.1 , 继续分片
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
         	** 确定分片,确定分片末尾索引
          	 **  例如 40M-32M=8M, 获取32M的索引,拿出这一部分作为一个分片文件
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS));
    }

	** 返回分片数量
    return splits;
  }



任务提交之后,会用一个YarnRunner或者LocalRunner 运行任务,调用map.run,让mapTask执行



而jobRunner不就是我们之前确定的ClientProtocal嘛嘛嘛嘛嘛嘛

转载请注明:文章转载自 www.mshxw.com
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号