文章目录
- Hadoop进阶篇
- MapReduce:Hadoop分布式并行计算框架
- MapReduce的理解
- MapReduce的核心思想
- MapReduce 编程模型
- MapReduce编程指导思想【八大步骤】
- Map 阶段 2 个步骤
- shuffle 阶段 4 个步骤
- reduce 阶段 2 个步骤
- MapReduce编程入门——单词统计
-
- MapReduce的运行模式
-
- Map Task数量及切片机制
- 1. MapTask个数
- 2. 如何控制 mapTask 的个数
- MapReduce 的 InputFormat
- 1. FileInputFormat常用类介绍
- 2. 使用CombineTextInputFormat实现切片个数控制
-
- 3. CombineTextInputFormat 示例
- 4. KeyValueTextInputFormat 示例
- 5. NlineInputFormat 示例
- 自定义InputFormat
- 第一步:自定义 RecordReader
- 第二步:自定义 InputFormat
- 第三步:定义测试类
- MapReduce的partitioner详解
- 1. 默认分区器 HashPartitioner
- 2. 自定义分区器
-
Hadoop进阶篇
MapReduce:Hadoop分布式并行计算框架
MapReduce的理解
- MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
- MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
MapReduce的核心思想
- MapReduce 的核心思想是:分而治之。适用于大量复杂的任务处理场景(大规模数据处理场景)。即使是发布过论文实现分布式计算的谷歌也只是实现了这种思想,而不是自己原创。
- Map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。 Reduce负责“合”,即对map阶段的结果进行全局汇总。这两个阶段合起来正是MapReduce思想的体现。
- 举例:我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就越快。然后把所有人的统计数加在一起。这就是“Reduce”。
MapReduce 编程模型
- MapReduce是采用一种分而治之的思想设计出来的分布式计算框架,那什么是分而治之呢?
- 比如一复杂、计算量大、耗时长的的任务,暂且称为“大任务”;
- 此时使用单台服务器无法计算或较短时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同的服务器上并行的执行;
- 最终再汇总每个小任务的结果。
- MapReduce 由两个阶段组成:
- Map 阶段:切分成一个个小的任务
- Reduce 阶段:汇总小任务的结果
- Map 阶段:
- map 阶段有一个关键的 map() 函数;
- 此函数的输入是键值对
- 输出是一系列键值对,输出写入本地磁盘。
- Reduce 阶段:
- reduce 阶段有一个关键的函数 reduce() 函数;
- 此函数的输入也是键值对,即 map 的输出(KV对);
- 输出也是一系列键值对,结果最终写入 HDFS。
- Map&Reduce:
MapReduce编程指导思想【八大步骤】
- 通过 MapReduce 编程模型总结,进行 MapReduce 开发一共有八大步骤,其中:
- map 阶段分为 2 个步骤;
- shuffle 阶段分为 4 个步骤;
- reduce 阶段分为 2 个步骤。
Map 阶段 2 个步骤
- 第一步:设置 InputFormat 类,将数据切分成 Key、Value 对,此 K-V 对作为第二步的输入;
- 第二步:自定义 map 逻辑,处理我们第一步传过来的 K-V 对数据,然后转换成新的 Key、Value 对,并输出。
shuffle 阶段 4 个步骤
- 第三步:对上一步输出的 K-V 对进行分区,相同 Key 的 K-V对属于同一分区;
- 第四步:对每个分区的数据按照 Key 进行排序;
- 第五步:对分区中的数据进行规约(combine 操作),降低数据的网络拷贝【可选步骤】;
- 第六步:对排序后的 K-V 对数据进行分组,分组过程中,key 相同的 K-V 对为一组,将同一组的 K-V 对的所有 value 放到一个集合当中,每组数据调用一次 reduce 方法。
reduce 阶段 2 个步骤
- 第七步:对多个 map 的任务进行合并、排序、写 reduce 函数自己的逻辑,对输入的 key、value 对进行处理,转换成新的 key、value 对进行输出;
- 第八步:设置将输出的 key、value 对数据保存到文件中。
MapReduce编程入门——单词统计
hadoop 当中常用的数据类型
- hadoop没有沿用java当中基本的数据类型,而是自己进行封装了一套数据类型,其自己封装的类型与java的类型对应如下,下表是常用的数据类型对应的Hadoop数据序列化类型
| Java类型 | Hadoop Writable类型 |
|---|
| boolean | BooleanWritable |
| byte | ByteWritable |
| int | IntWritable |
| float | FloatWritable |
| long | LongWritable |
| double | DoubleWritable |
| String | Text |
| Map | MapWritable |
| Array | ArrayWritable |
| byte[] | BytesWritable |
词频统计
- 需求:现有数据格式如下,每一行数据之间都是使用逗号进行分割,求取每个单词出现的次数。
- 定义 Mapper 类:
public class MyMapper extends Mapper {
private IntWritable intWritable = new IntWritable(1);
private Text text = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(",");
for (String word : words) {
// 将每个单词出现都记做 1 次
text.set(word);
// 将我们的k2、v2写出去到下游
context.write(text, intWritable);
}
}
}
public class MyReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int result = 0;
for (IntWritable value : values) {
// 将我们的结果进行累加
result += value.get();
}
// 继续输出我们的数据
IntWritable intWritable = new IntWritable(result);
// 将我们的数据输出
context.write(key, intWritable);
}
}
public class WordCounter extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// 获取Job对象,组装我们的八个步骤,每一个步骤都是一个class类
Configuration conf = super.getConf();
Job job = Job.getInstance(conf, WordCounter.class.getSimpleName());
// 判断输出路径是否存在,如果存在则删除
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}
// 实际工作中,程序运行完之后一般都是打包到集群上面去运行,打成一个 jar 包
// 如果要打包到集群上面运行,必须添加以下设置
job.setJarByClass(WordCounter.class);
// 第一步:读取文件,解析成key、value对,k1: 行偏移量,v1: 一行文本内容
job.setInputFormatClass(TextInputFormat.class);
// 指定我们去哪一个路径读取文件
TextInputFormat.addInputPath(job, new Path(args[0]));
// 第二步:自定义map逻辑,接收k1、v1,转换成新的k2、v2输出
job.setMapperClass(MyMapper.class);
// 设置map阶段输出的key、value的类型,其实就是k2、v2的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 第三步到六步:分区、排序、规约、分组。。。省略
// 第七步:自定义reduce逻辑
job.setReducerClass(MyReducer.class);
// 设置 key3、value3的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 第八步:输出k3、v3,进行保存
job.setOutputFormatClass(TextOutputFormat.class);
// 一定要注意,输出路径是需要不存在的,如果存在就报错
TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(Integer.parseInt(args[2]));
// 提交job任务
boolean result = job.waitForCompletion(true);
return result ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
// 提交run方法之后,得到一个程序的退出状态码
int run = ToolRunner.run(configuration, new WordCounter(), args);
// 根据我们的程序的退出状态码,退出整个进程
System.exit(run);
}
}
hadoop jar hadoop-demo-1.0.jar com.yw.hadoop.mr.WordCounter /1.txt /wordcount01 3
MapReduce的运行模式
1. 本地模式
- MapReduce 程序是被提交给 LocalJobRunner 在本地以单进程的形式运行,而处理的数据及输出结果可以在本地文件系统,也可以在hdfs上。
- 怎样实现本地运行?写一个程序,不要带集群的配置文件。本质是程序的conf中是否有mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local参数。
- 本地模式非常便于进行业务逻辑的debug
configuration.set("mapreduce.framework.name","local");
configuration.set("yarn.resourcemanager.hostname","local");
TextInputFormat.addInputPath(job,new Path("input"));
TextOutputFormat.setOutputPath(job,new Path("output"));
2. 集群运行模式
- 将 MapReduce 程序提交给 yarn 集群,分发到很多的节点上并发执行,处理的数据和输出结果应该位于hdfs文件系统。
- 提交集群的实现步骤:将程序打成JAR包,然后在集群的任意一个节点上用hadoop命令启动
yarn jar hadoop-demo-1.0.jar com.yw.hadoop.mr.WordCounter /1.txt /wordcount01 3
Map Task数量及切片机制
1. MapTask个数
- 在运行我们的MapReduce程序的时候,我们可以清晰的看到会有多个mapTask的运行,那么 mapTask 的个数究竟与什么有关?
- 是不是 Map Task 越多越好,或者说是不是 mapTask 的个数越少越好呢?
- 我们可以通过MapReduce的源码进行查看 mapTask 的个数究竟是如何决定的。
- 在MapReduce当中,每个mapTask处理一个切片split的数据量,注意切片与block块的概念很像,但是block块是HDFS当中存储数据的单位,切片split是MapReduce当中每个MapTask处理数据量的单位。
- MapTask并行度决定机制:
- 数据块:Block 是 HDFS 物理上把数据分成一块一块
- 数据切片:只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储
- 查看 FileInputFormat 的源码,里面 getSplits 的方法便是获取所有的切片,其中有个方法便是获取切片大小
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
// mapreduce.input.fileinputformat.split.minsize=1 默认值为1
// mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue
// blockSize为128M
- 由以上计算公式可以推算出split切片的大小刚好与block块相等。
- 那么hdfs上面如果有以下两个文件,文件大小分别为300M和10M,那么会启动多少个MapTask?
file1.txt 300M
file2.txt 10M
- 经过FileInputFormat的切片机制运算后,形成的切片信息如下:一共就会有 4 个切片,与我们 block 块的个数刚好相等
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~300
file2.txt.split1-- 0~10M
- 如果有 1000 个小文件,每个小文件是 1KB~100MB 之间,那么我们启动 1000 个 MapTask 是否合适,该如何合理的控制 MapTask 的个数?
2. 如何控制 mapTask 的个数
- 如果需要控制 mapTask 的个数,我们只需调整 minSize 和 maxSize 这两个值,那么切片的大小就会改变,切片大小改变之后,mapTask的个数就会改变:
- maxSize(切片最大值):如果比 blockSize 小,则会让切片变小,而且就等于配置这个参数的值;
- minSize(切片最小值):如果比 blockSize 大,则可以让切片变得比 blockSize 还大
MapReduce 的 InputFormat
- InputFormat 是 MapReduce 当中用于处理数据输入的一个组件,是最顶级的一个抽象父类,主要用于解决各个地方的数据源的数据输入问题。
1. FileInputFormat常用类介绍
- FileInputFormat类也是InputFormat的一个子类。如果需要操作hdfs上面的文件,基本上都是通过FileInputFormat类来实现的,我们可以通过FileInputFormat来实现各种格式的文件操作
- FileInputFormat的子实现类的UML类图如下:
| 类名 | 主要作用 |
|---|
| TextInputFormat | 读取文本文件 |
| CombineFileInputFormat | 在 MR 当中用于合并小文件,将多个小文件合并之后只需要启动换一个mapTask进行运行 |
| SequenceFileInputFormat | 处理SequenceFile这种格式的数据 |
| KeyValueTextInputFormat | 通过手动指定分隔符,将每一条数据解析成为key,value对类型 |
| NLineInputFormat | 指定数据的行数作为一个切片 |
| FixedLengthInputFormat | 文件的每个record是固定的长度,用于读取固定的二进制记录 |
2. 使用CombineTextInputFormat实现切片个数控制
- 框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask。这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
- CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
- 虚拟存储切片最大值设置CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m。注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
切片机制
- 生成切片过程包括 ① 虚拟存储过程,② 切片过程。
虚拟存储过程
- 将输入目录下所有文件按照文件名称字典顺序排序,将每个文件的大小,依次和设置的setMaxInputSplitSize值比较:
- ①如果不大于设置的最大值,逻辑上划分一个虚拟存储块;
- ②如果输入文件大于最大值,小于最大值的2倍,那么会将文件平分为2个虚拟存储块;
- ③如果输入文件大于最大值的两倍,那么以最大值为单位切割出虚拟存储块;
- 当剩余数据大小大于设置的最大值,且小于等于最大值2倍时,此时将剩余数据均分成2个虚拟存储块(防止出现太小切片)。
- 举个例子:setMaxInputSplitSize值为4M
- 例子一:输入文件大小为8.02M,则先逻辑上分成一个4M,剩余的大小为4.02M。如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个虚拟存储文件。
- 例子二:输入文件大小为6.02M,那么4 < 6.02 < 8,生成两个虚拟存储文件3.01、3.01
切片过程
- ①判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片;
- ②如果不大于,则跟下一个虚拟存储文件进行合并;如果合并后,还不大于setMaxInputSplitSize,则继续与下一个虚拟存储文件进行合并;当合并后,大于setMaxInputSplitSize后,共同形成一个切片。
- 举个例子:
- 有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个虚拟存储块大小分别为:1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
- 最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
3. CombineTextInputFormat 示例
// 第一步:读取文件,解析成key、value对,k1: 行偏移量,v1: 一行文本内容
// job.setInputFormatClass(TextInputFormat.class);
job.setInputFormatClass(CombineTextInputFormat.class);
// 虚拟存储切片最大值设置4m 设置每个切片处理数据量为4M
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
// 指定我们去哪一个路径读取文件
CombineTextInputFormat.addInputPath(job, new Path(args[0]));
- 将我们的切片设置成为4M大小,然后重新打包运行,观察mapTask的个数
4. KeyValueTextInputFormat 示例
- KeyValueTextInputFormat允许我们自己来定义分隔符,通过分隔符来自定义我们的key和value,参见下面的数据,数据之间的分隔符为@zolen@ 数据内容如下
hello@zolen@ input datas today
count@zolen@ hadoop spark
hello@zolen@ input some datas to test
hello 2
count 1
- 查看 KeyValueLineRecordReader 的源码,发现切割参数的配置:
public class KeyValueMain {
static class KeyValueMapper extends Mapper {
private LongWritable outvalue = new LongWritable(1);
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, outvalue);
}
}
static class KeyValueReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long result = 0;
for (LongWritable value : values) {
result += value.get();
}
context.write(key, new LongWritable(result));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("key.value.separator.in.input.line", "@zolen@");
Job job = Job.getInstance(conf);
job.setJarByClass(KeyValueMain.class);
// 第一步:读取文件,解析成key、value对
job.setInputFormatClass(KeyValueTextInputFormat.class);
KeyValueTextInputFormat.addInputPath(job, new Path(args[0]));
// 第二步:设置mapper类
job.setMapperClass(KeyValueMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 第三步到第六步:分区、排序、规约、分组
// 第七步:设置reducer类
job.setReducerClass(KeyValueReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 第八步:输出数据
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交job任务
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
5. NlineInputFormat 示例
- NlineInputFormat 允许我们自己定义输入的行数作为一个切片数据
- 代码实现:
public class NLineMain {
static class NLineMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(",");
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}
static class NLineReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
long wordNum = 0L;
for (LongWritable value : values) {
wordNum += value.get();
}
context.write(key, new LongWritable(wordNum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(NLineMain.class);
// 第一步:设置每个分片包含的数据行数
NLineInputFormat.setNumLinesPerSplit(job, 3);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.addInputPath(job, new Path(args[0]));
// 第二步:设置自定义mapper类
job.setMapperClass(NLineMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 第三步到第六步:分区、排序、规约、分组
// 第七步:设置自定义reduce类
job.setReducerClass(NLineReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 第八步:输出数据
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
自定义InputFormat
- MapReduce 框架当中已经给我们提供了很多的文件输入类,用于处理文件数据的输入,如果 MapReduce 提供的文件数据类还不够用的话,我们也可以通过自定义 InputFormat 来实现文件数据的输入
- 需求:现在有大量的小文件,我们通过自定义 InputFormat 实现将小文件全部读取,然后输出成为一个 SequenceFile 格式的大文件,进行文件的合并
第一步:自定义 RecordReader
public class MyRecordReader extends RecordReader {
private FileSplit fileSplit;
private Configuration configuration;
private BytesWritable bytesWritable;
private boolean flag = false;
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.configuration = context.getConfiguration();
this.bytesWritable = new BytesWritable();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!flag) {
int length = (int) fileSplit.getLength();
byte[] splitContent = new byte[length];
// 读取分片内容
Path path = fileSplit.getPath();
FileSystem fileSystem = path.getFileSystem(configuration);
FSDataInputStream fsdis = fileSystem.open(path);
// split 内容写入 splitContent
IOUtils.readFully(fsdis, splitContent, 0, length);
// 当前value值
bytesWritable.set(splitContent, 0, length);
flag = true;
IOUtils.closeStream(fsdis);
// fileSystem.close();
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return bytesWritable;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return flag ? 1.0f : 0.0f;
}
@Override
public void close() throws IOException {
}
}
第二步:自定义 InputFormat
public class MyInputFormat extends FileInputFormat {
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
MyRecordReader recordReader = new MyRecordReader();
recordReader.initialize(split, context);
return recordReader;
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
}
第三步:定义测试类
public class MyInputFormatMain extends Configured implements Tool {
static class MyMapper extends Mapper {
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
// 文件名
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String name = inputSplit.getPath().getName();
context.write(new Text(name), value);
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "mergeSmallFile");
// 如果要集群运行,需要加
job.setJarByClass(MyInputFormatMain.class);
job.setInputFormatClass(MyInputFormat.class);
MyInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
// 没有reduce,但是要设置reduce的输出的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
// 将我们的文件输出成为 SequenceFile格式
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MyInputFormatMain(), args);
System.exit(run);
}
}
MapReduce的partitioner详解
- 在mapreduce执行当中,有一个默认的步骤就是partition分区:
- MR编程的第三步就是分区,这一步中决定了map生成的每个kv对,被分配到哪个分区里,那么这是如果做到的?要实现此功能,涉及到了分区器的概念;
- 分区主要的作用就是默认将 key 相同的 kv 对数据发送到同一个分区中,在mapreduce当中有一个抽象类叫做Partitioner。
1. 默认分区器 HashPartitioner
- 我们可以通过HashPartitioner的源码,查看到分区的逻辑如下:
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner implements Partitioner {
public void configure(JobConf job) {}
public int getPartition(K2 key, V2 value, int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
- 我们能看到:HashPartitioner 实现了 Partitioner 接口,并实现了 getPartition() 方法——此方法中对 key 取 hash 值,再与 MAX_VALUE 按位与,结果再模上 reduce 任务的个数,所以,能得出结论,相同的key会落入同一个分区中。
2. 自定义分区器
- 实际生产中,有时需要自定义分区的逻辑,让key落入我们想让它落入的分区,此时就需要自定义分区器,如何实现?
- 参考默认分区器 HashPartitioner,自定义分区器类,比如 CustomPartitioner
- 实现 Partitioner 接口,同时实现 getPartition 方法,此方法中实现分区逻辑;
- 将指定会议分区器逻辑添加到第三步中
// 设置自定义分区器
job.setPartitionerClass(CustomPartitioner.class);
// 设置reduce个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
需求
- 现有一份关于手机的流量数据,样本数据和数据格式说明如下:
- 需求:- 使用mr,实现将不同的手机号的数据划分到6个不同的文件里面去,每个文件保存的是
- 手机号
- 对应的上行数据包总个数
- 下行数据包总个数
- 上行总流量
- 下行总流量
- 具体划分规则如下
- 135开头的手机号分到一个文件里面去,
- 136开头的手机号分到一个文件里面去,
- 137开头的手机号分到一个文件里面去,
- 138开头的手机号分到一个文件里面去,
- 139开头的手机号分到一个文件里面去,
- 其他开头的手机号分到一个文件里面去
需求分析
- 根据 MR 编程八步,需要实现的代码有:
- 1、针对输入数据,设计 JavaBean;
- 2、自定义的Mapper逻辑(第二步):为了汇总每个手机号的包、数据量的总数,map输出的key应该是手机号;
- 3、自定义的分区类(第三步):实现自定义分区的逻辑;
- 4、自定义的Reducer逻辑(第七步):汇总每个手机号的总量;
- 5、main程序入口实现
代码实现
public class FlowBean implements Writable {
private Integer upFlow;
private Integer downFlow;
private Integer upCountFlow;
private Integer downCountFlow;
@Override
public void write(DataOutput out) throws IOException {
out.write(upFlow);
out.write(downFlow);
out.write(upCountFlow);
out.write(downCountFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.downFlow = in.readInt();
this.upCountFlow = in.readInt();
this.downCountFlow = in.readInt();
}
// setter、getter、toString 省略
}
public class FlowMapper extends Mapper {
private FlowBean flowBean;
private Text text;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
this.flowBean = new FlowBean();
this.text = new Text();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("t");
String phoneNum = fields[1];
String upFlow = fields[6];
String downFlow = fields[7];
String upCountFlow = fields[8];
String downCountFlow = fields[9];
text.set(phoneNum);
flowBean.setUpFlow(Integer.parseInt(upFlow));
flowBean.setDownFlow(Integer.parseInt(downFlow));
flowBean.setUpCountFlow(Integer.parseInt(upCountFlow));
flowBean.setDownCountFlow(Integer.parseInt(downCountFlow));
context.write(text, flowBean);
}
}
public class CustomPartitioner extends Partitioner {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phoneNum = text.toString();
if (StringUtils.isNotBlank(phoneNum)) {
if (StringUtils.startsWith(phoneNum, "135")) {
return 0;
} else if (StringUtils.startsWith(phoneNum, "136")) {
return 1;
} else if (StringUtils.startsWith(phoneNum, "137")) {
return 2;
} else if (StringUtils.startsWith(phoneNum, "138")) {
return 3;
} else if (StringUtils.startsWith(phoneNum, "139")) {
return 4;
} else {
return 5;
}
} else {
return 5;
}
}
}
public class FlowReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
int upFlow = 0;
int downFlow = 0;
int upCountFlow = 0;
int downCountFlow = 0;
for (FlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upCountFlow += value.getUpCountFlow();
downCountFlow += value.getDownCountFlow();
}
context.write(key, new Text(upFlow + "t" + downFlow + "t" + upCountFlow + "t" + downCountFlow));
}
}
public class FlowMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
// 获取Job对象
Job job = Job.getInstance(super.getConf(), "flowCount");
job.setJarByClass(FlowMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(FlowMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 设置自定义分区器
job.setPartitionerClass(CustomPartitioner.class);
// 设置reduce个数
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.setReducerClass(FlowReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new FlowMain(), args);
System.exit(run);
}
}
运行测试
- 对于我们自定义分区的案例,必须打成jar包上传到集群上面去运行
- 因为我们本地已经没法通过多线程模拟本地程序运行了,将我们的数据上传到hdfs上面去
- 然后通过 hadoop jar提交到集群上面去运行,观察我们分区的个数与reduceTask个数的关系
- 提前将 jar 程序包和测试数据文件上传到服务器,运行命令:
hadoop jar hadoop-demo-1.0.jar com.yw.hadoop.mr.p04_partitioner.FlowMain /mr_data/data_flow.dat /mr_out 6
- 到 YARN 集群和 HDFS 集群查看运行结果:
- 原因:实现Writable接口 重写write方法编写错误用了 DataOutput.write(参数),改为DataOutput.writeInt(参数)。
- 思考:
- 如果手动指定6个分区,reduceTask个数设置为3个会出现什么情况
- 如果手动指定6个分区,reduceTask个数设置为9个会出现什么情况