上周我们将对InputFormat的几个重要子类,如FileInputFormat等展开了详细的分析,在了解了InputFormat的源码并分析其的功能的基础上,进一步分析了输入类FileInputFormat(切片)及其4个实现类(kv)的用法。本次我们将继续对核心源码的分析,首先从org.apache.hadoop.mapreduce.InputSplit开始。
org.apache.hadoop.mapreduce.InputSplit源码分析首先附上文件的源代码:
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.mapred.SplitLocationInfo;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordReader;
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException;
public abstract
String[] getLocations() throws IOException, InterruptedException;
@Evolving
public SplitLocationInfo[] getLocationInfo() throws IOException {
return null;
}
}
InputSplilt的主要作用是什么呢?我们可以查看官方给出的api。
大概意思是InputSplit意为输入分片,表示要由个人处理的数据Mapper,通常情况下,它在输入上呈现面向字节的视图,并且RecordReader作业负责处理此内容并呈现面向记录的视图。
进一步来看,在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
其工作原理如下:
Hadoop 2.x默认的block大小是128MB,Hadoop 1.x默认的block大小是64MB,可以在hdfs-site.xml中设置dfs.block.size,注意单位是byte。
分片大小范围可以在mapred-site.xml中设置,mapred.min.split.size mapred.max.split.size,minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807。
那么分片到底是多大呢?
minSize=max{minSplitSize,mapred.min.split.size}
maxSize=mapred.max.split.size
splitSize=max{minSize,min{maxSize,blockSize}}
我们再来看一下源码:
所以在我们没有设置分片的范围的时候,分片大小是由block块大小决定的,和它的大小一样。比如把一个258MB的文件上传到HDFS上,假设block块大小是128MB,那么它就会被分成三个block块,与之对应产生三个split,所以最终会产生三个map task。我又发现了另一个问题,第三个block块里存的文件大小只有2MB,而它的block块大小是128MB,那它实际占用Linux file system的多大空间?
答案是实际的文件大小,而非一个块的大小。
在网上查询到已经有人验证这个答案了:http://blog.csdn.net/samhacker/article/details/23089157
hadooop提供了一个设置map个数的参数mapred.map.tasks,我们可以通过这个参数来控制map的个数。但是通过这种方式设置map的个数,并不是每次都有效的。原因是mapred.map.tasks只是一个hadoop的参考数值,最终map的个数,还取决于其他的因素。
为了方便介绍,先来看几个名词:
block_size : hdfs的文件块大小,默认为64M,可以通过参数dfs.block.size设置
total_size : 输入文件整体的大小
input_file_num : 输入文件的个数
(1)默认map个数
如果不进行任何设置,默认的map个数是和blcok_size相关的。
default_num = total_size / block_size;
(2)期望大小
可以通过参数 mapred.map.tasks来设置程序员期望的map个数,但是这个个数只有在大于default_num的时候,才会生效。
goal_num = mapred.map.tasks;
(3)设置处理的文件大小
可以通过mapred.min.split.size 设置每个task处理的文件大小,但是这个大小只有在大于 block_size的时候才会生效。
split_size = max( mapred.min.split.size, block_size );
split_num = total_size / split_size;
(4)计算的map个数
compute_map_num = min(split_num, max(default_num, goal_num))
除了这些配置以外,mapreduce还要遵循一些原则。
mapreduce的每一个map处理的数据是不能跨越文件的,也就是说min_map_num >= input_file_num。 所以,最终的map个数应该为:
final_map_num = max(compute_map_num, input_file_num)
经过以上的分析,在设置map个数的时候,可以简单的总结为以下几点:
(1)如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。
(2)如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。
(3)如果输入中有很多小文件,依然想减少map个数,则需要将小文件merger为大文件,然后使用准则2。
接下来我们继续对源码进行分析。
org.apache.hadoop.mapreduce.Mapper源码分析package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.mapreduce.task.MapContextImpl; @InterfaceAudience.Public @InterfaceStability.Stable public class Mapper{ public abstract class Context implements MapContext { } protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING } @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { context.write((KEYOUT) key, (VALUEOUT) value); } protected void cleanup(Context context ) throws IOException, InterruptedException { // NOTHING } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } }
从源码中我们可以看出,Mapper类里总共包含四个方法,一个抽象类。
1.setup()方法—一般作为map()方法的准备工作,如进行相关配置文件的读取、参数的传递;
2.cleanup()方法—是用来做一些收尾工作,如关闭文件、Key-Value的分发;
3.map()方法—是真正的程序逻辑部分,如对一行文本的split、filter处理之后,将数据以key-Value的形式写入context;
4.run()方法—是驱动整个Mapper执行的一个方法,按照run()>>setup()>>map()>>cleanup()顺序执行;
5.Context抽象类—是Mapper里的一个内部抽象类,主要是为了在Map任务或者Reduce任务中跟踪task的相关状态和数据的存放。如Context可以存储一些jobConf有关的信息,在setup()方法中,就可以用context读取相关的配置信息,以及作为key-Value数据的载体。
同时,mapper有许多的子类。
如图为Mapper以及它的一些子类的类图(Mapper一共有九个子类。我们挑了其中的4个子类来做分析)。
package org.apache.hadoop.mapreduce.lib.map; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Mapper; @InterfaceAudience.Public @InterfaceStability.Stable public class InverseMapperextends Mapper { @Override public void map(K key, V value, Context context ) throws IOException, InterruptedException { context.write(value, key); } }
这个类很简单,只是将Key-Value进行反转,然后直接分发,如面包-生产商,我们既可以统计某一种面包来自多少个生产商,也可以统计一个生产商生产多少种面包。不同的需求,利用InverseMapper可以达到不同的效果。
子类TokenCounterMapperpackage org.apache.hadoop.mapreduce.lib.map; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; @InterfaceAudience.Public @InterfaceStability.Stable public class TokenCounterMapper extends Mapper
这个类使用StringTokenizer来获取value中的tokens(在StringTokenizer的构造函数中将value按照“tntf”进行分割),然后对每一个token,分发出一个
package org.apache.hadoop.mapreduce.lib.map; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; @InterfaceAudience.Public @InterfaceStability.Stable public class TokenCounterMapper extends Mapper
这个类其实就是将wordcount进行了正则化,匹配自己需要格式的word进行统计。
子类MultithreadedMapperpackage org.apache.hadoop.mapreduce.lib.map; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MapContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.StatusReporter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.task.MapContextImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; @InterfaceAudience.Public @InterfaceStability.Stable public class MultithreadedMapperextends Mapper { private static final Logger LOG = LoggerFactory.getLogger(MultithreadedMapper.class); public static String NUM_THREADS = "mapreduce.mapper.multithreadedmapper.threads"; public static String MAP_CLASS = "mapreduce.mapper.multithreadedmapper.mapclass"; private Class extends Mapper > mapClass; private Context outer; private List runners; public static int getNumberOfThreads(JobContext job) { return job.getConfiguration().getInt(NUM_THREADS, 10); } public static void setNumberOfThreads(Job job, int threads) { job.getConfiguration().setInt(NUM_THREADS, threads); } @SuppressWarnings("unchecked") public static Class > getMapperClass(JobContext job) { return (Class >) job.getConfiguration().getClass(MAP_CLASS, Mapper.class); } public static void setMapperClass(Job job, Class extends Mapper > cls) { if (MultithreadedMapper.class.isAssignableFrom(cls)) { throw new IllegalArgumentException("Can't have recursive " + "MultithreadedMapper instances."); } job.getConfiguration().setClass(MAP_CLASS, cls, Mapper.class); } @Override public void run(Context context) throws IOException, InterruptedException { outer = context; int numberOfThreads = getNumberOfThreads(context); mapClass = getMapperClass(context); if (LOG.isDebugEnabled()) { LOG.debug("Configuring multithread runner to use " + numberOfThreads + " threads"); } runners = new ArrayList (numberOfThreads); for(int i=0; i < numberOfThreads; ++i) { MapRunner thread = new MapRunner(context); thread.start(); runners.add(i, thread); } for(int i=0; i < numberOfThreads; ++i) { MapRunner thread = runners.get(i); thread.join(); Throwable th = thread.throwable; if (th != null) { if (th instanceof IOException) { throw (IOException) th; } else if (th instanceof InterruptedException) { throw (InterruptedException) th; } else { throw new RuntimeException(th); } } } } private class SubMapRecordReader extends RecordReader { private K1 key; private V1 value; private Configuration conf; @Override public void close() throws IOException { } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void initialize(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException { conf = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { synchronized (outer) { if (!outer.nextKeyValue()) { return false; } key = ReflectionUtils.copy(outer.getConfiguration(), outer.getCurrentKey(), key); value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value); return true; } } public K1 getCurrentKey() { return key; } @Override public V1 getCurrentValue() { return value; } } private class SubMapRecordWriter extends RecordWriter { @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { } @Override public void write(K2 key, V2 value) throws IOException, InterruptedException { synchronized (outer) { outer.write(key, value); } } } private class SubMapStatusReporter extends StatusReporter { @Override public Counter getCounter(Enum> name) { return outer.getCounter(name); } @Override public Counter getCounter(String group, String name) { return outer.getCounter(group, name); } @Override public void progress() { outer.progress(); } @Override public void setStatus(String status) { outer.setStatus(status); } @Override public float getProgress() { return outer.getProgress(); } } private class MapRunner extends Thread { private Mapper mapper; private Context subcontext; private Throwable throwable; private RecordReader reader = new SubMapRecordReader(); MapRunner(Context context) throws IOException, InterruptedException { mapper = ReflectionUtils.newInstance(mapClass, context.getConfiguration()); MapContext mapContext = new MapContextImpl (outer.getConfiguration(), outer.getTaskAttemptID(), reader, new SubMapRecordWriter(), context.getOutputCommitter(), new SubMapStatusReporter(), outer.getInputSplit()); subcontext = new WrappedMapper ().getMapContext(mapContext); reader.initialize(context.getInputSplit(), context); } @Override public void run() { try { mapper.run(subcontext); reader.close(); } catch (Throwable ie) { throwable = ie; } } } }
这个类使用多线程来执行Mapper(它由mapreduce.mapper.multithreadedmapper.mapclass设置)。该类重写了run()方法,首先设置运行上下文context和workMapper,然后启动多个MapRunner(内部类)子线程(由mapred.map.multithreadedrunner.threads 设置 ),最后使用join()等待子线程执行完毕。
MapRunner(内部类)继承了Thread,拥有独立的Context去执行Mapper,并进行异常处理。从MapRunner的Constructor中我们看见,它使用了独立的SubMapRecordReader、SubMapRecordWriter和SubMapStatusReporter。SubMapRecordReader在读Key-Value和SubMapRecordWriter在写Key-Value的时候都要同步。这是通过互斥访问MultithreadedMapper的上下文outer来实现的。
MultithreadedMapper可以充分利用CPU,采用多个线程处理后,一个线程可以同时在另一个线程执行的时候读取数据并执行,这样就使用了更多的CPU周期来执行任务,从而提高吞吐率(注意读写操作都是线程安全的)。但对于IO密集型的作业,采用MultithreadedMapper会适得其反,因为会有多个线程等待IO,IO便成为限制吞吐率的关键,这时候我们可以通过增多task数量的方法来解决,因为这样在IO上就是并行的。
总结本次我们首先对输入分片InputSplit进行了分析,对其分片大小进行了进一步的探究。随后又展开了对与mapreduce中的核心类mapper的分析,在了解其作用的基础上,又对其几个重要的子类做了源码分析,为之后的研究打下了基础。



