MapReduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1. MapReduce易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序跟写一个串行式程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。
2. 良好的扩展性
当你的计算资源不能得到满足时,你可以通过简单的增加机器来扩展它的计算能力。
3. 高容错性
MapReduce设计初衷就是使程序能够部署在廉价的PC机器上,这就要求它有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败。而这个过程不需要人工参与,完全由Hadoop内部完成。
4. 适合PB以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
1. 不擅长实时计算
MapReduce无法像MySQL一样,在毫秒或秒级内返回结果。
2. 不擅长流式计算
流式计算输入数据是动态的,而MapReduce的输入数据是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
3. 不擅长DAG(有向图)计算
多个应用存在依赖关系,后一个应用程序为前一个应用程序输出。在这种情况下MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入磁盘,造成大量磁盘IO导致性能非常低下。
MapReduce编程核心思想
1)MapReduce运算程序一般需要分成2个阶段:Map阶段和Reduce阶段。
2)Map阶段的并发MapTask,完全并行运行,互不相干。
3)Reduce阶段的Reduce Task,完全互不相干,但是它们的数据依赖于上一个阶段的所有Map Task并发实例的输出。
4)Map Reduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户逻辑非常复杂那就只能多个Map Reduce程序串行运行。
一个完整的MapReduce程序在分布式运行时有三类实例进程:
1)MrAppMaster:负责整个程序的过程调度及状态协调。
2)MapTask:负责Map阶段整个数据处理流程。
3)ReduceTask:负责Reduce阶段整个数据处理流程。
| Java类型 | Hadoop Writable 类型 |
|---|---|
| Boolean | BooleanWritable |
| Byte | ByteWritable |
| Int | IntWritable |
| Float | FloatWritable |
| Long | LongWritable |
| Double | DoubleWritable |
| String | Text |
| Map | MapWritable |
| Array | ArrayWritable |
用户编写的程序分成三个部分:Mapper、Reducer 和 Driver:
1. Mapper阶段
1)用户自定义的Mapper要继承自己的父类。
2)Mapper的输入是KV对的形式(KV的类型可以自己定义)。
3)Mapper中的业务逻辑写在map()方法中。
4)Mapper的输出数据是KV对的形式(KV类型可以自定义)。
5)MapTask进程对每一个
2. Reducer阶段
1)用户自定义的Reducer要继承自己的父类。
2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV。
3)Reducer中的业务逻辑写在reduce()方法中。
4)ReduceTask进程对每一个
3. Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象。
1.目标
统计文件中每一个单词出现次数。
2.环境装备
org.apache.hadoop hadoop-common 3.3.1 org.apache.hadoop hadoop-client 3.3.1 org.apache.hadoop hadoop-hdfs 3.3.1
3.编写程序
(1)编写Mapper类
package com.sly.hadoop.demo.mapreduce.map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 输出 for (String word : words) { context.write(new Text(word), new IntWritable(1)); } } }
(2)编写Reducer类
package com.sly.hadoop.demo.mapreduce.reduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 框架会安装key将所要的KV分组 values是key的所有值的集合 // 1 累加求和 int sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 输出 context.write(key, new IntWritable(sum)); } }
(3)编写Driver类
package com.sly.hadoop.demo.mapreduce.driver;
import com.sly.hadoop.demo.mapreduce.map.WordCountMapper;
import com.sly.hadoop.demo.mapreduce.reduce.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置jar加载路径
job.setJarByClass(WordCountDriver.class);
// 3 设置map和reduce类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(4)配置日志文件
在resource下创建log4j.properties
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
4.本地测试
(1)文件hello.txt
hello lala hadoop hdfs yarn I dislike work overtime word world every thing is no thing
(2)设置程序启动参数
D:studyhadoophello.txt D:studyhadoopwordcount
(3)运行driver main方法
(4)运行结果
查看part-r-00000
I 1 dislike 1 every 1 hadoop 1 hdfs 1 hello 1 is 1 lala 1 no 1 overtime 1 thing 2 word 1 work 1 world 1 yarn 1
5.集群测试
(1)打包
(2)上传程序到hadoop
[hadoop@hadoop110 mapreduce]$ pwd /opt/module/mapreduce [hadoop@hadoop110 mapreduce]$ ls wc.jar
(3)执行
[hadoop@hadoop110 mapreduce]$ hadoop jar wc.jar com.sly.hadoop.demo.mapreduce.driver.WordCountDriver /demo/api/test1/hello.txt /demo/api/test1/wordcount
(4)查看结果
[hadoop@hadoop110 mapreduce]$ hadoop fs -cat /demo/api/test1/wordcount/part-r-00000 I 1 dislike 1 every 1 hadoop 1 hdfs 1 hello 1 is 1 lala 1 no 1 overtime 1 thing 2 word 1 work 1 world 1 yarn 12.Haodoop序列化 2.1概述 2.1.1 序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便存储到磁盘(持久化)和网络传输。
反序列化就是将收到的字节序列(或其他数据传输协议)或是磁盘的持久化数据,转换为内存中的对象。
一般来说,活的对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能发送到网络的另一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
2.1.3 不用java序列化的原因 Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以Hadoop自己开发了一套系列化机制(Writable)。
Hadoop序列化特点:
(1)紧凑:高效使用存储空间。
(2)快速:读写数据额外开销小。
(3)可扩展:随着通信协议的升级而可升级。
(4)互操作:支持多语言交互。
在企业开发中往往常用的基本序列化类型不能满足所有需求, 比如在 Hadoop 框架内部
传递一个 bean 对象, 那么该对象就需要实现序列化接口。
实现步骤:
(1)必须实现Writable接口。
(2)反序列化时,需要反射调用空参构造函数,所以必须有空参构造。
(3)重写序列化方法。
(4)重写反序列化方法。
(5)注意反序列化的顺序和序列化的顺序完全一致。
(6)要想把结果显示在文件中,需要重写 toString(),可用” t” 分开,方便后续用。
(7)如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。
统计每个电话上下行流量总和。
输入(使用t分割): 电话 IP 上行 下行 状态 18671769057 192.168.0.120 1145 525 200 18175862145 192.168.0.121 154 58 200 13945214586 192.168.0.123 7458 214 200 18671769056 192.168.0.120 12 7541 200 18175862145 192.168.0.121 1254 518 200 13945214586 192.168.0.123 458 2014 200 输出: 电话 上行 下行 总和 13945214586 7916 2228 10144 18175862145 1408 576 1984 18671769056 12 7541 7553 18671769057 1145 525 16702.3.2 编写 MapReduce 程序
PhoneData
package com.sly.hadoop.demo.mapreduce.model;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PhoneData implements Writable {
private long upCount;
private long downCount;
private long sumCount;
public PhoneData() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upCount);
out.writeLong(downCount);
out.writeLong(sumCount);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upCount = in.readLong();
this.downCount = in.readLong();
this.sumCount = in.readLong();
}
@Override
public String toString() {
return upCount + "t" + downCount + "t" + sumCount;
}
public long getUpCount() {return upCount;}
public void setUpCount(long upCount) {this.upCount = upCount;}
public long getDownCount() {return downCount;}
public void setDownCount(long downCount) {this.downCount = downCount;}
public long getSumCount() {return sumCount;}
public void setSumCount(long sumCount) {this.sumCount = sumCount;}
}
Mapper
package com.sly.hadoop.demo.mapreduce.map; import com.sly.hadoop.demo.mapreduce.model.PhoneData; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class PhoneDataMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取一行 String line = value.toString(); String[] values = line.split("t"); // 封装phoneData对象 PhoneData phoneData = new PhoneData(); phoneData.setUpCount(Long.parseLong(values[2])); phoneData.setDownCount(Long.parseLong(values[3])); phoneData.setSumCount(phoneData.getUpCount() + phoneData.getDownCount()); // 输出 context.write(new Text(values[0]), phoneData); } }
Reducer
package com.sly.hadoop.demo.mapreduce.reduce; import com.sly.hadoop.demo.mapreduce.model.PhoneData; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class PhoneDataReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { PhoneData phoneData = new PhoneData(); for (PhoneData value : values) { phoneData.setUpCount(phoneData.getUpCount() + value.getUpCount()); phoneData.setDownCount(phoneData.getDownCount() + value.getDownCount()); phoneData.setSumCount(phoneData.getSumCount() + value.getSumCount()); } context.write(key, phoneData); } }
Driver
package com.sly.hadoop.demo.mapreduce.driver;
import com.sly.hadoop.demo.mapreduce.map.PhoneDataMapper;
import com.sly.hadoop.demo.mapreduce.model.PhoneData;
import com.sly.hadoop.demo.mapreduce.reduce.PhoneDataReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PhoneDataDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1.获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2.设置jar加载路径
job.setJarByClass(PhoneDataDriver.class);
// 3.设置map和reduce类
job.setMapperClass(PhoneDataMapper.class);
job.setReducerClass(PhoneDataReducer.class);
// 4.设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(PhoneData.class);
// 5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(PhoneData.class);
// 6.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7.提交并为用户打印进度
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
2.3.3 运行结果
3. MapReduce框架原理
3.1 InputFormat 数据输入
3.1.1 切片与 MapTask 并行度决定机制
(1)问题
MapTask 的并行度决定 Map 阶段的任务处理并发度,进而影响到整个 Job 的处理速度。
(2)MapTask 并行度决定机制
数据块:Block 是 HDFS 物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
1)通常切片大小和数据块大小保持一致,因为yarn优先分配为本地处理。如果大小一致则无需网络IO传输,如果不一致需要消耗网络IO传输数据,反而效率更低。
(1)Job 提交流程源码
// waitForCompletion() => submit(); // 1.建立连接 connect(); // 1)创建提交 Job 的代理 new Cluster(getConfiguration()); // 2)判断是本地 yarn 还是远程 initialize(jobTrackAddr, conf); // 2.提交job submitter.submitJobInternal(Job.this, cluster); // 1) 创建给集群提交数据的 Stag 路径 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); // 2) 获取 jobid , 并创建 Job 路径 JobID jobId = submitClient.getNewJobID(); // 3) 拷贝 jar 包到集群 copyAndConfigureFiles(job, submitJobDir); rUploader.uploadFiles(job, jobSubmitDir); // 4) 计算切片, 生成切片规划文件 writeSplits(job, submitJobDir); maps = writeNewSplits(job, jobSubmitDir); input.getSplits(job); // 5) 向 Stag 路径写 XML 配置文件 writeConf(conf, submitJobFile); conf.writeXml(out); // 6) 提交 Job,返回提交状态 status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
(2)FileInputFormat 切片源码
1)程序先找到你数据存储的目录。
2)开始遍历处理(规划切片)目录下每一个文件
3)遍历第一个文件
a)获取文件大小fs.sizeOf();
b)计算切片大小
computeSplitSize(Math.max(minSize, Math.min(maxSize, blockSize)))
c)默认情况下,切片大小=blockSize
d)开始切,形成第一个切片:0-128M,第二个切片128-256M,第三个切片256-300M(每次切片时都要判断剩余部分是否大于块的1.1倍,不大于1.1倍就划分一块切片)
e)将切片信息写到一个切片规划文件中
f)整个切片的核心过程在getSplit()方法中完成
g)InputSplit只记录了切片的元数据信息,比如起始位置、长度及所在的节点列表等。
4)提交切片规划文件到YARN上,YARN上的mrAppMaster就可以根据切片规划文件计算开启MapTask个数。
FileInputFormat切片机制
1.切片机制
(1)简单的按照文件内容长度进行切片
(2)切片大小默认等于block大小
(3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
2.案例分析
(1)输入数据2个文件
file1 ----- 320M
file2 ----- 20M
(2)经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.split1 ----- 0~128M
file1.split2 ----- 128~256M
file1.split3 ----- 256-320M
file2.split1 ----- 0~20M
FileInputFormat切片大小参数配置
(1)源码中计算切片大小的公式
Math.max(maxSize, Math.min(max.Size, blockSize));
mapreduce.input.fileinputformat.split.minsize=1(默认为1)
mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue(默认值Long.MAXValue)
因此默认情况下,切片大小=blockSize
(2)切片大小设置
maxsize(切片最大值):参数如果调的比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
minsize(切片最小值):参数调的比blockSize大,则会让切片变得比blockSize还大。
(3)获取切片信息API
// 获取切片的文件名称 String name = inputSplit.getPath().getName(); // 根据文件类型获取切片信息 FileSplit inputSplit = (FileSplit)context.getInputSplit();3.1.4 CombineTextInputFormat 切片机制
框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
1.应用场景
CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。
2.虚拟存储切片最大值设置
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 4M
注意: 虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
3.切片机制
生成切片过程包括: 虚拟存储过程和切片过程二部分。
(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值, 逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时将文件均分成 2 个虚拟存储块(防止出现太小切片)。
例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M,则先逻辑上分成一个4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储文件,所以将剩余的 4.02M 文件切分成(2.01M 和 2.01M)两个文件。
(2)切片过程:
(a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并, 共同形成一个切片。
(c)测试举例:有 4 个小文件大小分别为 1.7M、5.1M、3.4M 以及 6.8M 这四个小文件,则虚拟存储之后形成 6 个文件块,大小分别为:
1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M)
最终会形成 3 个切片,大小分别为:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M。
1.需求
将输入的大量小文件合并成一个切片统一处理。
(1)输入数据
准备5个小文件
(2)期望
一个切片处理多个文件
2.实现
(1)运行PhoneDataDemo观察切片数为5。
(2)在PhoneDataDriver中添加如下代码:
// 如果不设置 InputFormat, 它默认用的是 TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置 4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);3.1.6 FileInputFormat 实现类
在运行MapReduce程序时,输入的文件格式包括:基于行的日志文件、二进制文件、数据库表等。那么针对不同的数据类型,MapReduce是如何读取这些数据的呢。
FileInputFormat常见接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
1.TextInputFormat
TextInputFormat是默认的FIleInputFormat实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何终止符(换行符和回车符),Text类型。
2.KeyValueTextInputFormat
每一行均为一天记录,被分隔符分割为key,value。可以通过驱动类中的设置**conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, “t”);**来设定分隔符,默认分隔符是tad(t)。
3.NLineInputFormat
如果使用NLineInputFormat,代表每个Map进程处理的InputSplit不再按照block块去划分,而是按照NLineInputFormat指定的行数N来划分。即输入文件的总行数/N=切片数,如果不能整除,切片数=商+1。
1.需求
统计输入文件中每一行的第一个单次相同的行数。
(1)输入数据
damao 大毛数据1 ermao 二毛数据1 sanmao 三毛数据1 simao 四毛数据1 sanmao 三毛数据2 ermao 二毛数据2
(2)期望结果数据
damao 1 ermao 2 sanmao 2 simao 1
2.实现
(1)编写Mapper类
package com.sly.hadoop.demo.mapreduce.map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class KeyValueInputFormatDemoMapper extends Mapper{ @Override protected void map(Text key, Text value, Mapper .Context context) throws IOException, InterruptedException { context.write(key, new IntWritable(1)); } }
(2)编写Reducer类
package com.sly.hadoop.demo.mapreduce.reduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class KeyValueInputFormatDemoReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int totalCount = 0; for (IntWritable value : values) { totalCount += value.get(); } context.write(key, new IntWritable(totalCount)); } }
(3)编写Driver类
package com.sly.hadoop.demo.mapreduce.driver;
import com.sly.hadoop.demo.mapreduce.map.KeyValueInputFormatDemoMapper;
import com.sly.hadoop.demo.mapreduce.reduce.KeyValueInputFormatDemoReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class KeyValueInputFormatDemoDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取配置信息以及封装任务
Configuration configuration = new Configuration();
configuration.set(KeyValueLineRecordReader.KEY_VALUE_SEPARATOR, "t");
Job job = Job.getInstance(configuration);
// 2.设置jar加载路径
job.setJarByClass(KeyValueInputFormatDemoDriver.class);
// 3.设置map和reduce类
job.setMapperClass(KeyValueInputFormatDemoMapper.class);
job.setReducerClass(KeyValueInputFormatDemoReducer.class);
// 4.设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
// 7.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8.提交 并为用户打印进度
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
执行后结果:
damao 1 ermao 2 sanmao 2 simao 13.1.8 NLineInputFormat Demo
1.需求
(1)输入数据
统计单词进行个数,要求根据每个输入文件的行数来规定输出多少个切片。此案例要求每三行放入一个切片中。
(2)期望结果
日志打印分片方式为:Number of splits:4
2.实现
(1)编写Mapper类
package com.sly.hadoop.demo.mapreduce.map; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class NLineInputFormatDemoMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String[] split = value.toString().split("t"); for (String s : split) { context.write(new Text(s), new IntWritable(1)); } } }
(2)编写Reducer类
package com.sly.hadoop.demo.mapreduce.reduce; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class NLineInputFormatDemoReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { int totalCount = 0; for (IntWritable value : values) { totalCount += value.get(); } context.write(key, new IntWritable(totalCount)); } }
(3)编写Driver类
package com.sly.hadoop.demo.mapreduce.driver;
import com.sly.hadoop.demo.mapreduce.map.NLineInputFormatDemoMapper;
import com.sly.hadoop.demo.mapreduce.reduce.NLineInputFormatDemoReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class NLineInputFormatDemoDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2.设置jar加载路径
job.setJarByClass(NLineInputFormatDemoDriver.class);
// 3.设置map和reduce类
job.setMapperClass(NLineInputFormatDemoMapper.class);
job.setReducerClass(NLineInputFormatDemoReducer.class);
// 4.设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6.NLineInputFormat设置,每个切片InputSplit中划分三条记录
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 3);
// 7.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8.提交 并为用户打印进度
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(4)运行结果
2021-12-19 17:01:30,775 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:43.1.9 自定义InputFormat
在开发中,Hadoop框架自带的InputFormat类型不能满足所有应用场景,需要自定义InputFormat来解决问题。
自定义InputFormat步骤如下:
(1)自定义一个类继承FileInputFormat。
(2)改写ReadRecorder,实现一次读取一个完整文件封装为KV。
(3)在输出时使用SequenceFileOutPutFormat输出合并文件。
1.需求
将多个小文件合并成一个 SequenceFile 文件(SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对的文件格式),SequenceFile 里面存储着多个文件,存储的形式为文件路径+名称为 key,文件内容为 value。
2.实现
(1)编写SelfDefineFileInputFormat类
package com.sly.hadoop.demo.mapreduce.fileinputformat; import com.sly.hadoop.demo.mapreduce.recordreader.SelfDefineRecordReader; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import java.io.IOException; public class SelfDefineFileInputFormat extends FileInputFormat{ @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { SelfDefineRecordReader recordReader = new SelfDefineRecordReader(); recordReader.initialize(split, context); return recordReader; } @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } }
(2)编写SelfDefineRecordReader类
package com.sly.hadoop.demo.mapreduce.recordreader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; public class SelfDefineRecordReader extends RecordReader{ private Configuration configuration; private FileSplit split; private boolean isProgress = true; private BytesWritable value = new BytesWritable(); private Text key = new Text(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit)split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (isProgress) { // 1 定义缓存区 byte[] contents = new byte[(int) split.getLength()]; FileSystem fs; FSDataInputStream fis = null; try { // 2 获取文件系统 Path path = split.getPath(); fs = path.getFileSystem(configuration); // 3 读取数据 fis = fs.open(path); // 4 读取文件内容 IOUtils.readFully(fis, contents, 0, contents.length); // 5 输出文件内容 value.set(contents, 0, contents.length); // 6 获取文件路径及名称 String name = split.getPath().toString(); // 7 设置输出的 key 值 key.set(name); } catch (Exception e) { e.getStackTrace(); } finally { IOUtils.closeStream(fis); } isProgress = false; return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return key; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } }
(3)编写Mapper类
package com.sly.hadoop.demo.mapreduce.map; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SequenceFileMapper extends Mapper{ @Override protected void map(Text key, BytesWritable value, Mapper .Context context) throws IOException, InterruptedException { context.write(key, value); } }
(4)编写Reducer类
package com.sly.hadoop.demo.mapreduce.reduce; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class SequenceFileReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); } }
(5)编写Driver类
package com.sly.hadoop.demo.mapreduce.driver;
import com.sly.hadoop.demo.mapreduce.fileinputformat.SelfDefineFileInputFormat;
import com.sly.hadoop.demo.mapreduce.map.SequenceFileMapper;
import com.sly.hadoop.demo.mapreduce.reduce.SequenceFileReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import java.io.IOException;
public class SequenceFileDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 1.获取配置信息以及封装任务
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2.设置jar加载路径
job.setJarByClass(SequenceFileDriver.class);
// 3.设置map和reduce类
job.setMapperClass(SequenceFileMapper.class);
job.setReducerClass(SequenceFileReducer.class);
// 4.设置map输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 5.设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 6.SelfDefineFileInputFormat设置
job.setInputFormatClass(SelfDefineFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
// 7.设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 8.提交 并为用户打印进度
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
(6)运行结果



