栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce入门

MapReduce入门

一、概述

用户再处理分布式计算任务时只需要:
业务逻辑实现 写入 MapReduce 框架提供的API 【多线程+分布式】

二、MapReduce 运行思想

当处理一个“单一的”计算任务时,如果都要多线程加速,唯一的办法就是拆分数据,同步计算,合并结果。【因为计算任务时唯一且不可拆分的。】


三、MR 计算流程


在一次的MR任务中,Mapper对象是单例模式,而Map方法会重复调用很多次。
Hadoop 内部的Test 类型 与String类型对比的好处就在于 可以很方便的修改对象的内容。

 
四、分布式计算的核心:序列化 

Hadoop 序列化:将JVM内存中的对象存储到磁盘进行传输或者永久保存


Q:与java自带的序列化的区别?

Hadoop 序列化和反序列化的具体实现


自定义序列化的过程,本质上是在指定某个Class 的传输规范



由于 分布式计算 需要在多个设备中传输数据,所有在MapTask和ReduceTask中输入和输出的参数必须是实现序列化接口的, 且为了保证正确的写入文件,需要自定义toString方法

Hadoop中提供的基本数据类型,如Text、intWritable 都是可序列化的。

五、MapReduce框架原理:

    从Disk文件读取到Context的方式:InputFormat
    1)切片与MT并行度:


    Job 提交流程(源码解读):
    Internal method for submitting jobs to the system.

      The job submission process involves:
     	Checking the input and output specifications of the job.
     	***Computing the InputSplits for the job.
     	Setup the requisite accounting information for the DistributedCache of the job, if necessary.
     	***Copying the job's jar and configuration to the map-reduce system directory on the distributed file-system.
     	Submitting the job to the JobTracker and optionally monitoring it's status.
     Params:
     job – the configuration to submit
     cluster – the handle to the Cluster
    
JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //检查输出目录
    checkSpecs(job);

   		 Configuration conf = job.getConfiguration();

		//临时文件目录
       Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    
		//任务 ID 
   	  JobID jobId = submitClient.getNewJobID();
   	  job.setJobID(jobId);
	  Path submitJobDir = new Path(jobStagingArea, jobId.toString());

    
    //配置文件
      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

Job在像集群提交任务时,提交三个文件:Jar Configuration InputSplits

    FileInputrFormat源码解析
public List getSplits(JobContext job) throws IOException {

	//从配置参数中获取minsize maxsize
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    // generate splits
    //结果集
    List splits = new ArrayList();
    //获取job中包含的文件信息
    List files = listStatus(job);

  
    for (FileStatus file: files) {
	//是对每个文件进行单独切割的
      Path path = file.getPath();
      long length = file.getLen();
      if (length != 0) {
  		//获取文件所在的块的信息:
      	BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
         //FileStatus是LocatedFileStatus的父类
         //locatedFileStatus包含了块位置信息
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
        //从fs中获取块位置(NameNode)
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        
        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            //计算当前所在块
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
	
			//添加切片信息(文件路径、切片大小、块位置、剩余长度)
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
        
        }
      } else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());


    return splits;
  }

(1) 计算切片大小时:

  protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

long minSize = Math.max(getFormatMinSplitSize():1, getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);


  public static final String SPLIT_MAXSIZE = 
    "mapreduce.input.fileinputformat.split.maxsize";
  public static final String SPLIT_MINSIZE = 
    "mapreduce.input.fileinputformat.split.minsize";

意为:取minsize blocksize maxSize的中间数。
所以想要在blocksize大小的情况下,改变切片大小的方法:Minsize 和 maxSize
(2)分隔文件时:

        if (isSplitable(job, path)) {
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

只有在文件大小>切片大小的1.1 倍时才会进行切分(防止出现切片过小的情况)

    FileInputFormat的子类

    1) TextInputFormat
public class TextInputFormat extends FileInputFormat {

  @Override
  public RecordReader 
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {

    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
    return new LineRecordReader(recordDelimiterBytes);
    //Treats keys as offset in file and value as line.

  }

2) KeyValueTextInputFormat

public class KeyValueTextInputFormat extends FileInputFormat {

  public RecordReader createRecordReader(InputSplit genericSplit,
      TaskAttemptContext context) throws IOException {
    
    context.setStatus(genericSplit.toString());
    return new KeyValueLineRecordReader(context.getConfiguration());
  }

	  This class treats a line in the input as a key/value pair separated by a separator character. 
	  The separator can be specified in config file under the attribute name  
	  mapreduce.input.keyvaluelinerecordreader.key.value.separator. 
	  The default separator is the tab character ('t').
    Combined


设置InputFormat的方法:
job.setInputFormat()

六、MapReduce 工作流:



在Map之后、Reduce之前的内容成为Shuffle

七 、分区


八、排序

Map()完成之后,K,V将进入环形缓冲区,首先标定分区,其次在达到一定阈值后进行排序【一次】,写入磁盘,产生一次溢出文件,当MapTask完成之后,对所有的溢出文件进行归并排序【二次】。ReduceTask,将MapTask中的文件拷贝到本地后,还要进行一次排序【三次】
默认是按照字典排序。

自定义排序:重写Key中的compaterTo方法

九、Combiner

Combin操作是在【排序之后】将Key相同的键值对进行合并,是一个可选操作。
可能发生在:

1. 从output中溢出时
2. 多个溢出文件进行归并时

Combin 与Reducer的关系



Combiner的输入参数和Reducer相同。
都是对具有相同键的Values进行处理

所以,如果combiner和Reducer执行的是相同的过程。可以直接把Reducer注册到combiner中。
此时可以看出,Combiner重要的价值在于,提前Reducer的运算,分担Reducer的计算任务

十、OutPutFormat

public class AnswerOuPutFormat extends FileOutputFormat {
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new RecordWriter(){
            FileSystem fileSystem = FileSystem.get(job.getConfiguration());
            FSDataOutputStream   ans = fileSystem.create(new Path(
                    "C:\Users\Juyi\Desktop\WordCountdemo\data\Output11\answer.txt"),false);
            FSDataOutputStream  noans = fileSystem.create(new Path(
                    "C:\Users\Juyi\Desktop\WordCountdemo\data\Output11\noanswer.txt"),false);
            @Override
            public void write(Text key, IntWritable value) throws IOException, InterruptedException {

                 String toWrite=(key+"t"+value);
                 if (key.toString().equals("answers")){
                    ans.writeBytes(toWrite);
                }
                else{
                    noans.writeBytes(toWrite);
                }

            }

            @Override
            public void close(TaskAttemptContext context) throws IOException, InterruptedException {
                IOUtils.closeStream(ans);
                IOUtils.closeStream(noans);


            }
        };
    }
}


//        此处定义的FileOutputFormat是为了输出_SUCCESS
        FileOutputFormat.setOutputPath(job,new Path("...."));

//        自定义的OutPutFormat
        job.setOutputFormatClass(AnswerOuPutFormat.class);

十一、Map、Reduce的工作机制


十二、Reduce并行度的设置




Shuffle环形缓冲区源码解析:

https://cloud.tencent.com/developer/article/1580681
public synchronized void collect(K key, V value, final int partition
                                     ) throws IOException {
      reporter.progress();
      if (key.getClass() != keyClass) {
        throw new IOException("Type mismatch in key from map: expected "
                              + keyClass.getName() + ", received "
                              + key.getClass().getName());
      }
      if (value.getClass() != valClass) {
        throw new IOException("Type mismatch in value from map: expected "
                              + valClass.getName() + ", received "
                              + value.getClass().getName());
      }
      if (partition < 0 || partition >= partitions) {
        throw new IOException("Illegal partition for " + key + " (" +
            partition + ")");
      }
      checkSpillException();
      bufferRemaining -= metaSIZE;
      if (bufferRemaining <= 0) {
        // start spill if the thread is not running and the soft limit has been
        // reached
        spillLock.lock();
        try {
          do {
            if (!spillInProgress) {
              final int kvbidx = 4 * kvindex;
              final int kvbend = 4 * kvend;
              // serialized, unspilled bytes always lie between kvindex and
              // bufindex, crossing the equator. Note that any void space
              // created by a reset must be included in "used" bytes
              final int bUsed = distanceTo(kvbidx, bufindex);
              final boolean bufsoftlimit = bUsed >= softLimit;
              if ((kvbend + metaSIZE) % kvbuffer.length !=
                  equator - (equator % metaSIZE)) {
                // spill finished, reclaim space
                resetSpill();
                bufferRemaining = Math.min(
                    distanceTo(bufindex, kvbidx) - 2 * metaSIZE,
                    softLimit - bUsed) - metaSIZE;
                continue;
              } else if (bufsoftlimit && kvindex != kvend) {
                // spill records, if any collected; check latter, as it may
                // be possible for metadata alignment to hit spill pcnt
                startSpill();
                final int avgRec = (int)
                  (mapOutputByteCounter.getCounter() /
                  mapOutputRecordCounter.getCounter());
                // leave at least half the split buffer for serialization data
                // ensure that kvindex >= bufindex
                final int distkvi = distanceTo(bufindex, kvbidx);
                final int newPos = (bufindex +
                  Math.max(2 * metaSIZE - 1,
                          Math.min(distkvi / 2,
                                   distkvi / (metaSIZE + avgRec) * metaSIZE)))
                  % kvbuffer.length;
                setEquator(newPos);
                bufmark = bufindex = newPos;
                final int serBound = 4 * kvend;
                // bytes remaining before the lock must be held and limits
                // checked is the minimum of three arcs: the metadata space, the
                // serialization space, and the soft limit
                bufferRemaining = Math.min(
                    // metadata max
                    distanceTo(bufend, newPos),
                    Math.min(
                      // serialization max
                      distanceTo(newPos, serBound),
                      // soft limit
                      softLimit)) - 2 * metaSIZE;
              }
            }
          } while (false);
        } finally {
          spillLock.unlock();
        }
      }

      try {
        // serialize key bytes into buffer
        int keystart = bufindex;
        keySerializer.serialize(key);
        if (bufindex < keystart) {
          // wrapped the key; must make contiguous
          bb.shiftBufferedKey();
          keystart = 0;
        }
        // serialize value bytes into buffer
        final int valstart = bufindex;
        valSerializer.serialize(value);
        // It's possible for records to have zero length, i.e. the serializer
        // will perform no writes. To ensure that the boundary conditions are
        // checked and that the kvindex invariant is maintained, perform a
        // zero-length write into the buffer. The logic monitoring this could be
        // moved into collect, but this is cleaner and inexpensive. For now, it
        // is acceptable.
        bb.write(b0, 0, 0);

        // the record must be marked after the preceding write, as the metadata
        // for this record are not yet written
        int valend = bb.markRecord();

        mapOutputRecordCounter.increment(1);
        mapOutputByteCounter.increment(
            distanceTo(keystart, valend, bufvoid));

        // write accounting info
        kvmeta.put(kvindex + PARTITION, partition);
        kvmeta.put(kvindex + KEYSTART, keystart);
        kvmeta.put(kvindex + VALSTART, valstart);
        kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
        // advance kvindex
        kvindex = (kvindex - Nmeta + kvmeta.capacity()) % kvmeta.capacity();
      } catch (MapBufferTooSmallException e) {
        LOG.info("Record too large for in-memory buffer: " + e.getMessage());
        spillSingleRecord(key, value, partition);
        mapOutputRecordCounter.increment(1);
        return;
      }
    }
十三、Hadoop 压缩







转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/762388.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号