Hadoop中有一个抽象类是InputFormat,InputFormat抽象类是MapReduce输入数据的顶层基类,这个抽象类中只定义了两个方法
一个是getSplits方法 另一个是createRecordReader方法
这个抽象类下面有三个子继承类
DBInputFormat是操作数据库的, FileInputFormat是操作文件类型数据的, DelegatingInputFormat是用在处理多个输入时使用的
这里面比较常见的也就是FileInputFormat了,FileInputFormat是所有以文件作为数据源的基类,FileInputFormat保存job输入的所有文件,并实现了对输入文件计算splits的方法,至于获得文件中数据的方法是由子类实现的。
FileInputFormat下面还有一些子类:
CombineFileInputFormat:处理小文件问题的,后面我们再详细分析 TextInputFormat:是默认的处理类,处理普通文本文件,他会把文件中每一行作为一个记录,将每一行的起始偏移量作为key,每一行的内容作为value,这里的key和value就是我们之前所说的k1,v1 它默认以换行符或回车键作为一行记录 NLineInputFormat:可以动态指定一次读取多少行数据 这里面的TextInputFormat是我们处理文本数据的默认处理类,TextInputFormat的顶层基类是InputFormat,下面我们先来看一下这个抽象类的源码1、下载源码
下载hadoop的源码:
https://archive.apache.org/dist/hadoop/common/hadoop-3.2.0/hadoop-3.2.0-src.tar.gz2、解压导入
解压到指定目录下(D:IdeaProjects),然后找到里面的hadoop-mapreduce-client-core子项目,我们要分析的内容都在这里面。
D:IdeaProjectshadoop-3.2.0-srchadoop-mapreduce-projecthadoop-mapreduce-clienthadoop-mapreduce-client-core
将这个项目导入到idea中,这是一个maven项目,导入进去之后项目会自动开始下载依赖包,一直等依赖包下载完成即可,但是这将会是一个漫长的过程,有可能下载一会进度条就一动不动了,这都是很正常的现象,建议大家修改maven官方中央仓库的地址,改为国内镜像地址,这样速度会很快,但是注意了,改为国内镜像地址以后,有可能会遇到个别依赖下载不下来的情况,这也是很正常的现象,
所以说,maven官方中央仓库的特点是:依赖最全,但是下载速度贼慢
国内镜像仓库的特点是:下载速度贼快,但是可能会缺失部分依赖,我是遇到过这种问题的。
分析下来发现都不靠谱,所以最终给大家一个折中方案,先使用国内镜像仓库下载,将能下载的依赖包全部都下载下来,如果不缺东西,那正好,如果发现部分依赖找不到,那就还切换回官网中央仓库。
如何将maven依赖的下载地址改为国内镜像地址呢?
很简单,直接修改本地安装好的maven的conf目录下的settings.xml配置文件即可。
找到settings.xml配置文件中的标签,在这个标签中添加国内镜像地址,
在这里我们使用阿里云的maven国内镜像地址
alimaven aliyun maven http://maven.aliyun.com/nexus/content/groups/public/ central
修改好之后重启idea,让他重新加载项目的依赖,或者打开cmd命令行窗口,进入到hadoop-mapreduce-client-core这个子项目,执行编译命令,这样就会开始下载项目需要的依赖了。
其实我是倾向于在cmd命令行窗口中执行,这样能看到下载的详细进度信息,并且下载进程也不会卡死,使用idea编辑器自动下载依赖时,时间过长可能会导致下载进程卡死
C:Usersyehua>cd D:IdeaProjectshadoop-3.2.0-srchadoop-mapreduce-projecthadoop-mapreduce-clienthadoop-mapreduce-client-core C:Usersyehua>d: D:IdeaProjectshadoop-3.2.0-srchadoop-mapreduce-projecthadoop-mapreduce-clienthadoop-mapreduce-client-core>mvn clean compile-DskipTests [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 11.261s [INFO] Finished at: Fri Apr 24 22:03:19 CST 2020 [INFO] Final Memory: 35M/518M [INFO] ------------------------------------------------------------------------
最后看到BUILD SUCCESS就说明编译成功了,编译成功也也就意味着这个项目的依赖都下载成功了。
下面就可以在idea中重新打开这个项目了
注意:打开这个项目打开以后需要在项目的src/main/java目录上点击右键–>Mark Directory as–>Sources ROOT
如果不执行这个操作,后面在查看源码的时候按Ctrl+左键,不能进入类或方法里面
接下来我们找到项目中的InputFormat抽象类,这个抽象类中一共就两个方法
getSplit createRecordReader
看getSplit的注释
Logically split the set of input files for the job.
逻辑切分Job中输入文件的集合,其实这个就是我们前面说的MapReduce会对输入数据进行逻辑切分,获取InputSplit
所以这个方法的作用就是把输入文件的集合切分为InputSplit的集合
ListgetSplits(JobContext context) throws IOException, InterruptedException;
接着来看一下createRecordReader的注释
Create a record reader for a given split. The framework will call
{@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
the split is used.
针对每一个InputSplit创建一个RecordReader,在split被使用之前,框架会先调用RecordReader的initialize初始化方法。
RecordReader3、getSplits方法的具体实现代码createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;
接下来先详细分析一下getSplits方法的具体实现代码,在FileInputFormat中有具体实现。
public List4、面试题getSplits(JobContext job) throws IOException { StopWatch sw = new StopWatch().start(); //获取InputSplit的size的最小值minSize和最大值maxSize long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); // 创建List,准备保存生成的InputSplit List splits = new ArrayList (); //获取输入文件列表 List files = listStatus(job); boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false); //迭代输入文件列表 for (FileStatus file: files) { //是否忽略子目录,默认不忽略 if (ignoreDirs && file.isDirectory()) { continue; } //获取 文件/目录 路径 Path path = file.getPath(); //获取 文件/目录 长度 long length = file.getLen(); if (length != 0) { //保存文件的Block块所在的位置 BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } //判断文件是否支持切割,默认为true if (isSplitable(job, path)) { //获取文件的Block大小,默认128M long blockSize = file.getBlockSize(); //计算split的大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize); //还需要处理的文件剩余字节大小,其实就是这个文件的原始大小 long bytesRemaining = length; // while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); //组装InputSplit splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } //最后会把bytesRemaining/splitSize<=1.1的那一部分内容作为一个InputSplit if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable //如果文件不支持切割,执行这里 if (LOG.isDebugEnabled()) { // Log only if the file is big enough to be splitted if (length > Math.min(file.getBlockSize(), minSize)) { LOG.debug("File is not splittable so no parallelization " + "is possible: " + file.getPath()); } } //把不支持切割的文件整个作为一个InputSplit splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", Timetaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; }
根据刚才的分析,下面我们来看几个面试题?
(1)一个1G的文件,会产生多少个map任务?Block块默认是128M,所以1G的文件会产生8个Block块
默认情况下InputSplit的大小和Block块的大小一致,每一个InputSplit会产生一个map任务
所以:1024/128=8个map任务
一个文件,不管再小,都会占用一个block,所以这1000个小文件会产生1000个Block,
那最终会产生1000个InputSplit,也就对应着会产生1000个map任务
一个140M的文件,会产生多少个map任务?
根据前面的分析
注意:这个有点特殊,140M/128M=1.09375<1.1
所以,这个文件只会产生一个InputSplit,也最终也就只会产生1个map 任务。
这个文件其实再稍微大1M就可以产生2个map 任务了。
说一千、道一万 不如真刀真枪的干一场。
来实际操刀验证一下:
执行GenerateDat.java代码,会生成两个文件,一个140M,一个141M
GenerateDat.java代码如下:
package com.imooc.mr;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
public class GenerateDat {
public static void main(String[] args) throws Exception{
generate_140M();
generate_141M();
}
private static void generate_141M() throws IOException {
String fileName = "D:\s_name_141.dat";
System.out.println("start: 开始生成141M文件->"+fileName);
BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
int num = 0;
while(num<8221592){
bfw.write("zhangsan beijing");
bfw.newline();
num ++;
if(num%10000==0){
bfw.flush();
}
}
System.out.println("end: 141M文件已生成");
}
private static void generate_140M() throws IOException {
String fileName = "D:\s_name_140.dat";
System.out.println("start: 开始生成140M文件->"+fileName);
BufferedWriter bfw = new BufferedWriter(new FileWriter(fileName));
int num = 0;
while(num<8201592){
bfw.write("zhangsan beijing");
bfw.newline();
num ++;
if(num%10000==0){
bfw.flush();
}
}
System.out.println("end: 140M文件已生成");
}
}
start: 开始生成140M文件->D:s_name_140.dat
end: 140M文件已生成
start: 开始生成141M文件->D:s_name_141.dat
end: 141M文件已生成
[root@bigdata01 soft]# ll ..... -rw-r--r--. 1 root root 147616384 Apr 24 23:33 s_name_140.dat -rw-r--r--. 1 root root 147976384 Apr 24 23:33 s_name_141.dat(3)将这两个文件上传到HDFS的根目录
[root@bigdata01 soft]# hdfs dfs -put s_name_140.dat / [root@bigdata01 soft]# hdfs dfs -put s_name_141.dat / [root@bigdata01 soft]# hdfs dfs -ls / ........ -rw-r--r-- 2 root supergroup 147616384 2020-04-24 23:37 /s_name_140.dat -rw-r--r-- 2 root supergroup 147976384 2020-04-24 23:37 /s_name_141.dat(4)使用我们之前开发的WordCount代码分别计算着两个文件
注意,在使用WordCount代码的时候建议把代码中的日志输出信息注释掉,否则任务在执行过程中会打印过多的日志信息,额外占用linux本地磁盘空间(5)计算140M文件
[root@bigdata01 soft]# cd hadoop-3.2.0 [root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /s_name_140.dat /out140
然后到yarn的8088界面查看,发现map任务只有一个
[root@bigdata01 hadoop-3.2.0]# hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /s_name_141.dat /out141
然后到yarn的8088界面查看,发现map任务有两个
6、接下来看一下createRecordReader方法createRecordReader的具体实现是在TextInputFormat类中 //针对每一个InputSplit都会创建一个RecordReader @Override public RecordReadercreateRecordReader(InputSplit split, TaskAttemptContext context) { //获取换行符,默认是没有配置的 String delimiter = context.getConfiguration().get( "textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); //创建一个行阅读器 return new LineRecordReader(recordDelimiterBytes); }
在createRecordReader的最后会创建一个 LineRecordReader,所以接下来看一下这个类,前面我们说过了,框架会先调用这个阅读器的初始化方法initialize
//初始化方法
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
//获取传过来的InputSplit,将InputSplit转换成子类FileSplit
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
//MAX_LINE_LENGTH对应的参数默认没有设置,所以会取Integer.MAX_VALUE
this.maxLineLength = job.getInt(MAX_LINE_LENGTH, Integer.MAX_VALUE);
//获取InputSplit的起始位置
start = split.getStart();
//获取InputSplit的结束位置
end = start + split.getLength();
//获取InputSplit的路径
final Path file = split.getPath();
// 打开文件,并且跳到InputSplit的起始位置
final FileSystem fs = file.getFileSystem(job);
fileIn = fs.open(file);
// 获取文件的压缩信息
CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file);
//如果文件是压缩文件,则执行if中的语句
if (null!=codec) {
isCompressedInput = true;
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn =
((SplittableCompressionCodec)codec).createInputStream(
fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new CompressedSplitLineReader(cIn, job,
this.recordDelimiterBytes);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn;
} else {
if (start != 0) {
// So we have a split that is only part of a file stored using
// a Compression codec that cannot be split.
throw new IOException("Cannot seek in " +
codec.getClass().getSimpleName() + " compressed stream");
}
in = new SplitLineReader(codec.createInputStream(fileIn,
decompressor), job, this.recordDelimiterBytes);
filePosition = fileIn;
}
} else {
//如果文件是未压缩文件(普通文件),则执行else中的语句
//跳转到文件中的指定位置
fileIn.seek(start);
//针对未压缩文件,创建一个阅读器读取一行一行的数据
in = new UncompressedSplitLineReader(
fileIn, job, this.recordDelimiterBytes, split.getLength());
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
//如果start不等于0,表示不是第一个InputSplit,所以就把start的值重置为第二行的起始位置
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
分析完初始化方法,接下来看一下这个阅读器中最重要的方法nextKeyValue()
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
// k1 就是每一行的起始位置
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
if (pos == 0) {
newSize = skipUtfByteOrderMark();
} else {
//读取一行数据,赋值给value,也就是v1
newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
pos += newSize;
}
if ((newSize == 0) || (newSize < maxLineLength)) {
break;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos " +
(pos - newSize));
}
if (newSize == 0) {
key = null;
value = null;
return false;
} else {
return true;
}
}
二、OutputFormat
OutputFormat分析
前面我们分析了InputFormat,下面我们来分析一下OutputFormat,顾名思义,这个是控制MapReduce输出的。
OutputFormat是输出数据的顶层基类 FileOutputFormat:文件数据处理基类 TextOutputFormat:默认文本文件处理类
这几个其实和InputFormat中的那几个文本处理类是对应着的,当然了针对输出数据还有其它类型的处理类,我们在这先分析最常见的文本文件处理类,其他类型的等我们遇到具体场景再具体分析。
我们来看一下OutputFormat的源码,这个类主要由三个方法
getRecordWriter checkOutputSpecs getOutputCommitter
先看一下getRecordWriter这个方法的具体实现,在TextOutputFormat中
public RecordWritergetRecordWriter(TaskAttemptContext job ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration(); //任务的输出数据是否需要压缩,默认为false,也就是不压缩 boolean isCompressed = getCompressOutput(job); //获取输出的key(k3)和value(v3)之间的分隔符,默认为t String keyValueSeparator= conf.get(SEPARATOR, "t"); CompressionCodec codec = null; String extension = ""; if (isCompressed) { Class extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); codec = ReflectionUtils.newInstance(codecClass, conf); extension = codec.getDefaultExtension(); } //获取输出文件路径信息 Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf); //获取文件输出流 FSDataOutputStream fileOut = fs.create(file, false); if (isCompressed) { return new LineRecordWriter<>( new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } else { //创建行书写器 return new LineRecordWriter<>(fileOut, keyValueSeparator); } }
最后会创建一个行书写器,具体看一下代码实现,它里面有一个核心方法write()
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
//如果key和value都是null 则直接返回
if (nullKey && nullValue) {
return;
}
//如果key不为null,则写入key
if (!nullKey) {
writeObject(key);
}
//key和value都不为null,这里面的条件才会成立,会写入key和value的分隔符
//注意:视频中上面这一行的注释添加的有误,以这里为准
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
//如果value不为null,则写入value
if (!nullValue) {
writeObject(value);
}
//写入换行符
out.write(newline);
}
接着是checkOutputSpecs方法,这个方法的具体实现在FileOutputFormat中
public void checkOutputSpecs(JobContext job
) throws FileAlreadyExistsException, IOException{
// Ensure that the output directory is set and not already there
//根据输出目录创建path
Path outDir = getOutputPath(job);
//如果获取不到,则说明任务没有设置输出目录,直接报错即可
if (outDir == null) {
throw new InvalidJobConfException("Output directory not set.");
}
// get delegation token for outDir's file system
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { outDir }, job.getConfiguration());
//判断输出目录是否已存在,如果存在则抛异常
if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
throw new FileAlreadyExistsException("Output directory " + outDir +
" already exists");
}
}
最后看一下getOutputCommitter,这个方法的具体实现在FileOutputFormat中
//获取输出数据提交器,负责将数据写入到输出目录中
public synchronized
OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = PathOutputCommitterFactory.getCommitterFactory(
output,
context.getConfiguration()).createOutputCommitter(output, context);
}
return committer;
}



