本文简单的介绍了mapreduce框架的原理以及工作流程,部分原理性的内容均增加了简单的案例,能够更加直观的理解其中的奥妙,欢迎大家指正。
文章目录
一、概述二、优缺点三、官方WordCount源码解析:
Map/Reduce编程规范:案例: 五、Mappr/Reduce框架原理
5.1、Job提交流程5.2、MapTask流程:5.3、ReduceTask流程:5.4、切片流程,默认按照文本切片,TextInputFormat5.5、序列化
案例: 5.6、Parttition分区
案例:总结: 5.7、Combiner合并
案例: 5.8、Map/Reduce join
案例: 5.8、ETL数据清洗
案例: 5.9、数据压缩 总结
一、概述
Map/Reduce是一个分布式计算框架,基于Map/Reduce程序写出来的程序能够运行在由成千上万的机器组成的集群之上,并且,能高容错性的处理TB/PB级别的数据。
一个Map/Reduce程序会把一个文件分成若干独立的数据块,由map任务进行处理,处理之后将map端的输出结果进行排序后作为reduce端的输入,之后由reduce任务进行计算。通常,map/reduce端的输出结果都会被记录在文件系统中,以便于失败的时候重新进行计算。
Map/Reduce程序一般需要分成2个阶段:Map阶段和Reduce阶段。Map阶段中的各个Map任务是相互独立的,互不干扰。Reduce阶段中的各个Reduce任务是相互独立的,互不干扰,但是Reduce阶段的数据输入依赖于Map阶段的数据输出。
二、优缺点
优点:
- 易于编程:Map/Reduce框架中已经默认实现了大量的逻辑处理,用户只需要关注自身场景下的逻辑业务即可。良好的扩展性:当集群的机器数量无法满足需要处理的任务数量后,可以通过动态的增加集群机器来满足。高容错性:集群中的某一台机器故障后, 可将自身正在处理的任务转移至其他机器上进行处理,以保证任务的完整性。高效性:适用于TB/PB级别的数据计算处理,并且多台机器之间协同处理计算,能够保证高效性。
缺点:
- 不擅长实时计算:Map/Reduce旨在处理海量数据,数据的计算任务可能会运行好长时间。不擅长流式计算:不擅长数据一条一条的输入的计算。不擅长DAG有向无环图计算:不擅长将前一个Map/Reduce任务的执行结果作为后续一个Map/Reduce任务的输入的计算。
public class WordCount {
public WordCount() {
}
// drive的固定写法
public static void main(String[] args) throws Exception {
// 获取hadoop配置对象
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [...] ");
System.exit(2);
}
// 获取job对象
Job job = Job.getInstance(conf, "word count");
// 设置jar包路径
job.setJarByClass(WordCount.class);
// 设置自定义Mapper类
job.setMapperClass(WordCount.TokenizerMapper.class);
// 设置自定义Combiner类
job.setCombinerClass(WordCount.IntSumReducer.class);
// 设置自定义Reducer类
job.setReducerClass(WordCount.IntSumReducer.class);
// 设置最终输出的key的类型
job.setOutputKeyClass(Text.class);
// 设置最终输出的value的类型
job.setOutputValueClass(IntWritable.class);
// 设置输入文件,从参数中获取,可以有多个
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
// 设置输出文件,从参数中获取
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
// 提交job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
// 自定义Reducer类,必须继承Reducer类
public static class IntSumReducer extends Reducer {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
// 重写reduce方法,编写自定义业务逻辑
public void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
// 定义一个累加器
int sum = 0;
IntWritable val;
// 进到reduce的数据,都是key相同的数据,所以直接循环values进行累加即可得到这个key出现的次数
for(Iterator var5 = values.iterator(); var5.hasNext(); sum += val.get()) {
val = (IntWritable)var5.next();
}
// 将sum封装成IntWritable
this.result.set(sum);
// 一个单词统计已经完成,输出
context.write(key, this.result);
}
}
// 自定义Mapper类,必须继承Mapper类
public static class TokenizerMapper extends Mapper
Map/Reduce编程规范:
Map阶段:
- 用户自定义的Map必须继承org.apache.hadoop.mapred.Mapper类。map的输入输出都是以键值对(K/V)的形式存在,其中输入的键值对(K/V)中,K是当前文件的内容的偏移量,V是文件中的一行数据。输出的K/V依据具体的实际情况来定义。Map阶段的核心业务逻辑写在map()方法中。map()方法对每一个K/V调用一次。
Reduce阶段:
- 用户自定义的Reduce必须继承org.apache.hadoop.mapred.Reducer类。reduce的输入输出都是以键值对(K/V)的形式存在,其中输入的键值对(K/V)是map阶段的输出键值对(K/V)。输出的K/V依据具体的实际情况来定义。Reduce阶段的核心业务逻辑写在reduce()方法中。reduce()方法对每一组相同K的K/V调用一次。
Driver阶段(固定的写法):
- 获取配置信息,获取Job实例对象。指定程序运行的jar包所在的路径。关联Mapper和Reduce业务类。指定Mapper输出的K/V类型。指定最终输出的K/V类型。指定输入输出的文件路径(其中输出文件路径不能存在)。提交作业。
实现wordcount功能。
代码地址:
wordcount功能
五、Mappr/Reduce框架原理
Mapper阶段:自定义业务逻辑。
- maptask获取对应的切片上的数据。数据通过InputFormat读入,默认TextInputFormat,按行读取,每次读取一行内容。
Shuffle阶段:Map阶段之后,Reduce阶段之前的部分,它描述了数据从map task输出到reduce task输入的过程。
1、每一个MapTask输出K/V都会被标记上分区标记后存放在一个环形缓冲区(可以理解为就是一个数组)中,环形缓冲区的默认大小为100M。
2、环形缓冲区分为两部分,一部分用来存放数据索引信息,一部分存放分区后的数据。
3、环形缓冲区有容量上限,所以当环形缓冲区中的内容超过总容量的80%的时候,就会发生溢写。此时会启动一个线程,将环形缓冲区中的数据写到磁盘上。
4、在溢写的过程中,会先对数据的key按照key的索引进行排序,这里用的是快排算法。
5、此时如果在客户端设置了Combiner,那么Combiner还会在数据写出之前进行归并处理,但是Combiner并不是适用于各种场景,需要视情况而定。
6、溢写是由单独线程来完成,不影响往缓冲区写map结果的线程,因为此时还有20%的空间没有数据,写线程会在环形缓冲区中这20%的空间中反向逆写(可以理解为从数据组的最后一个位置向前写数据)。
7、 溢写之后会产生两类文件,一种的索引文件,一种是数据文件,写出的数据也可以进行压缩处理,这样传到reduce端时,可以减少资源消耗。
8、当某个MapTask执行完成后,ReduceTask开始。ReduceTask从MapTask处拉取属于自己分区的数据,拉取下来的数据首先存放在内存中,如果内存存不下,那么就会持久化在磁盘上。
9、ReduceTask对拉取的文件进行归并排序,排序后进行分组处理,相同的Key的数据放在一起,最终将相同key的数据发送到reduce端进行处理。
Reducer阶段:ReduceTask 的个数 = 0,表示没有Reduce阶段,输出的文件个数和MapTask的个数一致。ReduceTask 的个数 默认为1,所以默认输出文件为一个。如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜,比如某个文件中的数据特别多,其他文件中的数据却特别少。
- reduce端的输入是map端的输出结果,mapTask完成之后就会通知reduceTask,reduceTask开始主动拉取数据。每一个reduceTask只拉去属于自己分区的数据,每个分区内部数据有序,但是多个分区就需要再次进行合并排序。进入reduce方法之前,还会按照key进行分组,所以进入reduce的数据都是key相同的数据。最终reduce的输出数据经过OutputFormat写出。
// 自定义Driver中的job提交的入口
final boolean wait = job.waitForCompletion(true);
public boolean waitForCompletion(boolean verbose) throws IOException, InterruptedException, ClassNotFoundException {
if (this.state == Job.JobState.DEFINE) {
// 提交入口
this.submit();
}
// verbose为true,就能在打印更多的日志信息
if (verbose) {
…………………………省略部分代码………………………………
}
// 最终结果中有一个_SUCCESS空文件,就是在这里。
return this.isSuccessful();
}
public void submit() throws IOException, InterruptedException, ClassNotFoundException {
// 这里再次确认job的运行状态
this.ensureState(Job.JobState.DEFINE);
// 新旧FI的API兼容处理
this.setUseNewAPI();
// 获取客户端链接:这里有两种客户端模式,一种LocalCient,一种是YarnClient。如果是在本地运行,那么最终就用的是LocalCient,如果用的是集群模式,那么最终用就是YarnClient。
this.connect();
final JobSubmitter submitter = this.getJobSubmitter(this.cluster.getFileSystem(), this.cluster.getClient());
this.status = (JobStatus)this.ugi.doAs(new PrivilegedExceptionAction() {
public JobStatus run() throws IOException, InterruptedException, ClassNotFoundException {
// job的内部正式的提交流程
return submitter.submitJobInternal(Job.this, Job.this.cluster);
}
});
// job提交完毕之后看,job的运行状态变为RUNNING
this.state = Job.JobState.RUNNING;
LOG.info("The url to track the job: " + this.getTrackingURL());
}
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
// 检查输出路径,存在或者为null,都会直接报错。
this.checkSpecs(job);
// 获取job运行的默认配置信息。
Configuration conf = job.getConfiguration();
addMRframeworkToDistributedCache(conf);
// 生成有一个临时文件路径:hadoop所在盘的跟目录下:/tmp/hadoop/mapred/staging/用户名/.staging,这个路径要关注下,最中会在后边拼接一个jobID,在这个路径下,会生成一些必要的文件,比如:job.split(切片信息文件)、job.xml(整个job运行时的配置信息)、jar包(如果是集群模式,会将jar上传到集群上,本地模式的话,不需要提交jar)
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
// 获取本机的IP和本机名称。
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
this.submitHostAddress = ip.getHostAddress();
this.submitHostName = ip.getHostName();
// 将本机的IP和本机名设置到job运行配置中。
conf.set("mapreduce.job.submithostname", this.submitHostName);
conf.set("mapreduce.job.submithostaddress", this.submitHostAddress);
}
```java
// 创建jobID,每一个job都有一个唯一的jobID。
JobID jobId = this.submitClient.getNewJobID();
job.setJobID(jobId);
// 前边的临时路径后边增加一个jobID。
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
JobStatus var25;
try {
// job运行配置中设置一写其他的参数
conf.set("mapreduce.job.user.name", UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set("mapreduce.job.dir", submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");
…………………………省略部分代码………………………………
// 向集群中提交信息,如果是集群模式,会将jar上传到集群上,本地模式的话,不需要提交jar。
this.copyAndConfigureFiles(job, submitJobDir);
LOG.debug("Creating splits at " + this.jtFs.makeQualified(submitJobDir));
// 文件切片开始
int maps = this.writeSplits(job, submitJobDir);
// 有几个切片就开启几个mapTask
conf.setInt("mapreduce.job.maps", maps);
LOG.info("number of splits:" + maps);
…………………………省略部分代码………………………………
// job.xml(整个job运行时的配置信息)写入临时文件目录
this.writeConf(conf, submitJobFile);
this.printTokens(jobId, job.getCredentials());
…………………………省略部分代码………………………………
return var25;
}
5.2、MapTask流程:
1、自定义Mapper类中的map方法: protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { …………………………省略部分代码……………………………… // MapTask入口 context.write(text, joinEntity); } 2、最终会进入MapTask中的write方法: @Override public void write(K key, V value) throws IOException, InterruptedException { // 每一条数据都会先标记上分区信息,然后进入环形缓冲区。 collector.collect(key, value, partitioner.getPartition(key, value, partitions)); } 3、最终会进入MapTask中的collect方法: public synchronized void collect(K key, V value, final int partition ) throws IOException { …………………………省略部分代码……………………………… try { // 环形缓冲区中的信息,key要支持序列化 int keystart = bufindex; keySerializer.serialize(key); …………………………省略部分代码……………………………… // 环形缓冲区中的信息,数据要在网络中传输,所以value也要支持序列化 final int valstart = bufindex; valSerializer.serialize(value); …………………………省略部分代码……………………………… // 索引信息存放到kvmeta中 kvmeta.put(kvindex + PARTITION, partition); kvmeta.put(kvindex + KEYSTART, keystart); kvmeta.put(kvindex + VALSTART, valstart); kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend)); // 计算kindex kvindex = (kvindex - Nmeta + kvmeta.capacity()) % kvmeta.capacity(); } catch (MapBufferTooSmallException e) { …………………………省略部分代码……………………………… } } 4、map方法执行完成后,会执行MapTask中的close方法,在close之前,会将环形缓冲区中的数据进行溢写collector.flush() public void close(TaskAttemptContext context ) throws IOException,InterruptedException { try { collector.flush(); } catch (ClassNotFoundException cnf) { throw new IOException("can't find class ", cnf); } collector.close(); } 5、MapTask中的flush方法 public void flush() throws IOException, ClassNotFoundException, InterruptedException { …………………………省略部分代码……………………………… try { …………………………省略部分代码……………………………… // 排序并溢写 sortAndSpill(); } …………………………省略部分代码……………………………… // 合并 mergeParts(); …………………………省略部分代码……………………………… } 6、MapTask中的sortAndSpill方法 private void sortAndSpill() throws IOException, ClassNotFoundException, InterruptedException { // 排序,实际上就是快排逻辑 sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); int spindex = mstart; final IndexRecord rec = new IndexRecord(); final InMemValBytes value = new InMemValBytes(); // 开始循环分区, 将这些分区中的数据写出到split0.out文件中 for (int i = 0; i < partitions; ++i) { …………………………省略部分代码……………………………… } …………………………省略部分代码……………………………… } 7、MapTask中的mergeParts方法 private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException { …………………………省略部分代码……………………………… // 循环分区文件,准备进行合并 for(int i = 0; i < numSpills; i++) { filename[i] = mapOutputFile.getSpillFile(i); finalOutFileSize += rfs.getFileStatus(filename[i]).getLen(); } // 如果分区文件只有一个,那么该分区文件就就是最终的输出文件 if (numSpills == 1) { //the spill is the final output …………………………省略部分代码……………………………… // 还会写出一个spilt0.out.index文件,reduce通过该文件取spilt0.out中的数据 if (indexCacheList.size() == 0) { Path indexFilePath = mapOutputFile.getSpillIndexFile(0); IntermediateEncryptedStream.validateSpillIndexFile( indexFilePath, job); sameVolRename(indexFilePath, indexFileOutput); } else { indexCacheList.get(0).writeToFile(indexFileOutput, job); } return; } …………………………省略部分代码……………………………… } 8、至此,MapTask流程结束,接下来会直接进入ReduceTask流程。
总结:
- 一个Job的Map阶段由客户端提交Job时的切片决定。每一个切片都会分配一个MaopTask,多个MapTask之间并行执行。默认情况下,切片大小等于BlockSize。切片时,不考虑单个整体数据的大小,而是对单个文件逐一进行切片。
1、MapTask流程结束,直接进入ReduceTask流程。
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
// ReduceTask的三个阶段:copy->sort->reduce
if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
// 抓取数据之前的一些初始化工作
initialize(job, getJobID(), reporter, useNewApi);
…………………………省略部分代码………………………………
// 抓取数据之前的一些初始化工作
shuffleConsumerPlugin.init(shuffleContext);
// 完成了copy阶段的工作
rIter = shuffleConsumerPlugin.run();
// sort阶段完成,即将进去reduce阶段。
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
// 进入reduce阶段。
if (useNewApi) {
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
}
2、在ReduceTask类中的run()方法中,通过shuffleConsumerPlugin.init(shuffleContext)进入到MergeManagerImpl的构造器中
public MergeManagerImpl(TaskAttemptID reduceId, JobConf jobConf,
FileSystem localFS,
LocalDirAllocator localDirAllocator,
Reporter reporter,
CompressionCodec codec,
Class extends Reducer> combinerClass,
CombineOutputCollector combineCollector,
Counters.Counter spilledRecordsCounter,
Counters.Counter reduceCombineInputCounter,
Counters.Counter mergedMapOutputsCounter,
ExceptionReporter exceptionReporter,
Progress mergePhase, MapOutputFile mapOutputFile) {
…………………………省略部分代码………………………………
// 提前开辟内存和磁盘空间,ReduceTask拉取过来的数据,优先存放在内存中,如果内存不足,则存放在磁盘上。
this.inMemoryMerger = createInMemoryMerger();
this.inMemoryMerger.start();
this.onDiskMerger = new OnDiskMerger(this);
this.onDiskMerger.start();
this.mergePhase = mergePhase;
}
3、在ReduceTask类中的run()方法中,通shuffleConsumerPlugin.run()进入到Shuffle的run()方法中
@Override
public RawKeyValueIterator run() throws IOException, InterruptedException {
…………………………省略部分代码………………………………
// ReduceTask开始主动拉取数据
eventFetcher.start();
…………………………省略部分代码………………………………
// 拉取数据结束,即将进入sort阶段。
copyPhase.complete(); // copy is already complete
taskStatus.setPhase(TaskStatus.Phase.SORT);
reduceTask.statusUpdate(umbilical);
}
4、在ReduceTask类中的run()方法中,进入到runNewReducer()方法中,即将开启reduce。
private
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator comparator,
Class keyClass,
Class valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
try {
// 进入reduce的run方法中,即将开启reduce。
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
5、在ReduceTask类中的runNewReducer()方法中,进入到Reducer的run方法中,进入自定义的reduce方法。
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
// 自定义的reduce方法
reduce(context.getCurrentKey(), context.getValues(), context);
}
…………………………省略部分代码………………………………
}
6、进入自定义的reduce方法
protected void reduce(SortEntity key, Iterable values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
key.setTotle();
// 写出数据
context.write(NullWritable.get(), key);
}
}
7、从自定义的reduce方法写出数据,默认使用TextOutputFormat的write方法写出数据
public synchronized void write(K key, V value)
throws IOException {
…………………………省略部分代码………………………………
// 先写出key
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
// 写出value,如果自定义了toString方法,这里会使用自定义的toString方法写出数据。
if (!nullValue) {
writeObject(value);
}
out.write(newline);
}
8、bean对象自定义的toString方法。
public String toString() {
return no + 't' + name + 't' + classes + 't' +
chiness + "t" + math +
"t" + english +
"t" + history +
"t" + physics +
"t" + totle;
}
9、至此,reduce段的一次流程完成,如果有多个reduce,将会重复上述流程。
5.4、切片流程,默认按照文本切片,TextInputFormat
public ListgetSplits(JobContext job) throws IOException { StopWatch sw = (new StopWatch()).start(); // 这里最小的切片信息取决于两个参数设置:this.getFormatMinSplitSize(),默认值就是1;mapreduce.input.fileinputformat.split.minsize默认值是0,缺省值是1。 long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); // 这里最大的切片信息取决于参数设置:mapreduce.input.fileinputformat.split.maxsize默认值为null,缺省值为long的最大值。 long maxSize = getMaxSplitSize(job); …………………………省略部分代码……………………………… Path path = file.getPath(); long length = file.getLen(); if (length != 0L) { …………………………省略部分代码……………………………… // 判断文件支不支持切片,即时文件再大,如果不支持切片,那么也就不会进行切片逻辑。 if (this.isSplitable(job, path)) { // 获取块大小,本地模式下默认32M,集群模式下128M或者256M。 long blockSize = file.getBlockSize(); // 切片计算:Math.max(minSize, Math.min(maxSize, blockSize)),所以要想控制切片大小,可以通过控制minSize、maxSize来达到效果,也就是间接设置mapreduce.input.fileinputformat.split.minsize和mapreduce.input.fileinputformat.split.maxsize这两个参数的值。 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; // 如果文件大小是32.1M,这里其实只分一片((double)bytesRemaining / (double)splitSize > 1.1D),因为文件大小必须大于切片的1.1倍。 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } // 切片之后,处理不够1.1倍的文件 if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } …………………………省略部分代码……………………………… }
总结:
- 程序先找到数据存储目录。开始遍历目录下的每一个文件。遍历到第一个文件时:
- 获取文件大小:long length = file.getLen()。计算切片大小:Math.max(minSize, Math.min(maxSize, blockSize))。默认默认情况下,切片大小等于blocksize=128M。开始切片,形成切片信息:第一个切片0128M,第二个切片128256M……每次切片后,剩余的部分要判断下是否大于切片大小的1.1倍,如果是,继续切片,否则直接划分为一片。将切片信息保存在切片规划文件中。并将切片信息封装在InputSplit中,放在LIst集合中。InputSplit中保存了切片信息的起始位置,结束位置,以及切片所在节点信息等。
hadoop由属于自己的一套序列化的方案,不套用JAVA的序列化的原因有以下几点:
节省空间:JAVA中的序列化属于重量级的序列化,包含大量的信息,而对于hadoop来说,数据传输时,并不需要那么多的信息。传输速率快:hadoop中的序列化内容仅包含了必要信息和少量的检验信息,会提高传输效率。互操作性:经过序列化之后的内容,在不同的语言之间也可以进行序列化和反序列化。
hadoop中bean序列化的步骤:
- 继承Writable接口。重写序列化方法。重写反序列化方法。序列化和反序列的顺序要一致。重写toString()方法。如果bean对象作为key,那么还需要实现Comparable接口,因为hadoop默认要对key能进行排序。
全年级的学生成绩,计算出来每个学生的总成绩。
代码地址:
序列化
5.6、Parttition分区如果设置ReduceTasks的个数大于1,比如2,那么就是2个ReduceTask,默认使用HashPartitioner分区,最终会产生两个文件。
public class HashPartitionerextends Partitioner { public HashPartitioner() { } public int getPartition(K key, V value, int numReduceTasks) { // (key的hash值 & Interger的最大值) % ReduceTasks的个数 return (key.hashCode() & 2147483647) % numReduceTasks; } }
如果不设置ReduceTasks的个数,默认就是1个ReduceTask,使用默认Partitioner分区,分区号就是0。
NewOutputCollector(JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException {
this.collector = MapTask.this.createSortingCollector(job, reporter);
this.partitions = jobContext.getNumReduceTasks();
if (this.partitions > 1) {
this.partitioner = (Partitioner)ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
this.partitioner = new Partitioner() {
public int getPartition(K key, V value, int numPartitions) {
// 默认分区号计算: 默认的ReduceTasks的个数 - 1
return NewOutputCollector.this.partitions - 1;
}
};
}
}
自定义Partitioner
- 自定义类继承Partitioner类。重写getPartition()方法。在Job中设置自定义Partitioner类。自定义Partitioner类后,需要设置ReduceTasks的个数,必须大于1才能走自定义的Partitioner分区。
全年级的学生成绩,计算出来每个学生的总成绩,并且按照每个班级单独输出一个文件。
代码地址:
Parttition分区
总结:- 如果ReduceTask的数量 > getPartition的结果,则会产生几个多余的空文件part-r-0000X。如果1 < ReduceTask的数量 < getPartition的结果,则会发生有一部分的分区文件没有ReduceTask来处理,报错IOException。如果ReduceTask的数量 = 1,不管有多少个分区,最终只会生成一个文件,因为当ReduceTask的数量 = 1的时候,压根就不会走自定义分区,而是采用默认的分区逻辑,最终只会计算得到一个0号分区。分区只能从0开始,依次累加。
Combiner是Mapper和Reducer之外的一种组件,可有可无。Combiner的父类就是Reduce。Conbiner和Reduce的区别:
- Combiner是在每一个MapTask输出结果之后执行,相当于每一个MapTask的输出结果都会经过一次Combiner。Reduce是接受 的MapTask的输出结果之后执行。
统计每个单词出现的次数,并且要求在reduce前进行combiner处理,减少网络传输带宽。
代码地址:
Combiner合并
5.8、Map/Reduce joinjoin使用于大表和小表关联查询的情况下。join可以发生在mapper阶段,也可以发生在reduce阶段。发生在mapper阶段就可以需要reduce阶段了,即设置ReduceTask 的个数 = 0。在mapper阶段处理,需要将小表缓存到内存中,可以有多个MapTask同时处理。发生在reduce阶段就需要在mapper阶段标记每一个数据属于哪个表,最后在reduce阶段根据标记出料。 案例:
订单文件关联商品文件,最后输出一个包含订单ID、商品名称、订单数量的文件。
代码地址:
Map/Reduce join
5.8、ETL数据清洗数据清洗一般涉及到以下一些内容:
数据缺失值处理去除不需要的字段数据格式判断:比如电话号码、电子邮件等。数据正确性处理:比如个人信息中,按照身份证号码重新覆盖个人年龄等。 案例:
提取出文件中电话号码符合要求的人员信息。
代码地址:
ETL数据清洗
5.9、数据压缩在hadoop中可以在三个地方采用数据压缩。
mapper输入前:输入端采用压缩。输入端采用压缩的时候,无需指定压缩算法,Hadoop会根据文件的后缀名自动采用默认的压缩算法。hadoop中有三种默认的压缩算法:mapper输出之后:reduce输出之后:
案例:
设置最终输出文件为压缩格式。
代码地址:
数据压缩
总结本文简单的介绍了mapreduce框架的原理以及工作流程,部分原理性的内容均增加了简单的案例,能够更加直观的理解其中的奥妙,欢迎大家指正。



