- 1 MapReduce 概述
- 1.1 MapReduce 定义
- 1.2 MapReduce 优缺点
- 1.2.1 优点
- 1.2.2 缺点
- 1.3 MapReduce 核心思想
- 1.4 MapReduce 进程
- 1.5 常用数据序列化类型
- 1.6 MapReduce 编程规范
- 1.7 WordCount 案例实操
- 1.7.1 本地测试
- 1.7.2 提交到集群测试
- 2 Hadoop 序列化
- 2.1 序列化概述
- 2.2 自定义 bean 对象实现序列化接口(Writable)
- 2.3 序列化案例实操
MapReduce 是一个分布式运算程序 的编程框架,是用户开发“基于 Hadoop 的数据分析应用”的核心框架。
MapReduce 核心功能是将用户编写的业务逻辑代码 和自带默认组件 整合成一个完整的分布式运算程序 ,并发运行在一个 Hadoop 集群上。
- MapReduce 易于编程
它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的 PC 机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得 MapReduce 编程变得非常流行。 - 良好的扩展性
当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。 - 高容错性
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高的容错性。 比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。 - 适合 PB 级以上海量数据的离线处理
可以实现上千台服务器集群并发工作,提供数据处理能力。
- 不擅长实时计算
MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回结果。 - 不擅长流式计算
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。 - 不擅长 DAG(有向无环图)计算
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce 并不是不能做,而是使用后, 每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。
一个完整的 MapReduce 程序在分布式运行时有三类实例进程:
- MrAppMaster:负责整个程序的过程调度及状态协调。
- MapTask:负责 Map 阶段的整个数据处理流程。
- ReduceTask:负责 Reduce 阶段的整个数据处理流程。
用户编写的程序分成三个部分:Mapper、Reducer 和 Driver。
-
需求:在给定的文本文件中统计输出每一个单词出现的总次数
-
需求分析:按照 MapReduce 编程规范,分别编写 Mapper,Reducer,Driver。
-
环境准备
-
创建 maven 工程,MapReduceDemo
-
在 pom.xml 文件中添加如下依赖
org.apache.hadoop hadoop-client 3.1.3 junit junit 4.12 org.slf4j slf4j-log4j12 1.7.30 -
在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入。
```java log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n ```
-
创建包名:com.zjw.mapreduce.wordcount
-
-
编写程序(千万注意清楚导入有关类的包名,这里很容易出错)
- 编写 Mapper 类
package com.zjw.mapreduce.combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordCountMapper extends Mapper
{ private Text outK = new Text(); private IntWritable outV = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 // atguigu atguigu String line = value.toString(); // 2 切割 // atguigu // atguigu String[] words = line.split(" "); // 3 循环写出 for (String word : words) { // 封装outk outK.set(word); // 写出 context.write(outK, outV); } } } - 编写 Reducer 类
package com.atguigu.mapreduce.combiner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WordCountReducer extends Reducer
{ private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; // atguigu, (1,1) // 累加 for (IntWritable value : values) { sum += value.get(); } outV.set(sum); // 写出 context.write(key,outV); } } - 编写Driver类
package com.atguigu.mapreduce.combiner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1 获取job Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置jar包路径 job.setJarByClass(WordCountDriver.class); // 3 关联mapper和reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4 设置map输出的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 设置最终输出的kV类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // job.setCombinerClass(WordCountCombiner.class); // // job.setNumReduceTasks(0); job.setCombinerClass(WordCountReducer.class); // // 6 设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path("D:\Input")); FileOutputFormat.setOutputPath(job, new Path("D:\Output")); // 7 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } -
本地测试
- 在打包之前,我们可以在集群动态的给文件输入路径,输出路径赋值,先修改WordCountDriver以下代码
// 6 设置输入路径和输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
用 maven 打 jar 包,需要添加的打包插件依赖
maven-compiler-plugin 3.6.1 1.8 1.8 maven-assembly-plugin jar-with-dependencies make-assembly package single maven-assembly-plugin这个插件是将项目的依赖一起打包成jar包,如果集群中有相应的依赖可以不用这个插件
-
将程序打成 jar 包
4. 修改不带依赖的 jar 包名称为 wc.jar,并拷贝该 jar 包到 Hadoop 集群的/opt/module/hadoop/hadoop-3.1.3 路径。
5. 启动 Hadoop 集群
6. 执行 WordCount 程序
确保执行之前HDFS 中有input文件夹以及对应要执行wordcount的文件,而且不能有output文件夹
hadoop jar wc.jar com.zjw.MapReduceWordCount.WordCountDriver /input /output
执行完毕后HDFS出现结果:
- 什么是序列化?
序列化就是把 内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输 。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。 - 为什么要序列化?
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而 序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机 。 - 为什么不用 Java 的序列化
Java 的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop 自己开发了一套序列化机制(Writable)。 - Hadoop 序列化特点:
- 紧凑 :高效使用存储空间。
- 快速:读写数据的额外开销小。
- 互操作:支持多语言的交互
在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在 Hadoop 框架内部传递一个 bean 对象,那么该对象就需要实现序列化接口。
具体实现 bean 对象序列化步骤如下 7 步:
-
必须实现 Writable 接口
-
反序列化时,需要反射调用空参构造函数,所以必须有空参构造
public FlowBean() { super(); } -
重写序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } -
重写反序列化方法
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } -
注意反序列化的顺序和序列化的顺序完全一致.
-
要想把结果显示在文件中,需要重写 toString(),可用"t"分开,方便后续用。
-
如果需要将自定义的 bean 放在 key 中传输,则还需要实现 Comparable 接口,因为MapReduce 框中的 Shuffle 过程要求对 key 必须能排序。详见后面排序案例。
@Override public int compareTo(FlowBean o) { // 倒序排列,从大到小 return this.sumFlow > o.getSumFlow() ? -1 : 1; }
-
需求
- 统计每一个手机号耗费的总上行流量、总下行流量、总流量
- 输入格式
- 输出格式
-
需求分析
-
编写 MapReduce 程序
FlowBean.java
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
private long upFlow; // 上行流量
private long downFlow; // 下行流量
private long sumFlow; // 总流量
// 空参构造
public FlowBean() {
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow + "t" + downFlow + "t" + sumFlow;
}
}
FlowMapper.java
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper{ private Text outK = new Text(); private FlowBean outV = new FlowBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 // 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200 String line = value.toString(); // 2 切割 // 1,13736230513,192.196.100.1,www.atguigu.com,2481,24681,200 7 - 3= 4 // 2 13846544121 192.196.100.2 264 0 200 6 - 3 = 3 String[] split = line.split("t"); // 3 抓取想要的数据 // 手机号:13736230513 // 上行流量和下行流量:2481,24681 String phone = split[1]; String up = split[split.length - 3]; String down = split[split.length - 2]; // 4封装 outK.set(phone); outV.setUpFlow(Long.parseLong(up)); outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow(); // 5 写出 context.write(outK, outV); } }
FlowReducer.java
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer{ private FlowBean outV = new FlowBean(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 1 遍历集合累加值 long totalUp = 0; long totaldown = 0; for (FlowBean value : values) { totalUp += value.getUpFlow(); totaldown += value.getDownFlow(); } // 2 封装outk, outv outV.setUpFlow(totalUp); outV.setDownFlow(totaldown); outV.setSumFlow(); // 3 写出 context.write(key, outV); } }
FlowDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 1 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar
job.setJarByClass(FlowDriver.class);
// 3 关联mapper 和Reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 设置mapper 输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 设置最终数据输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 设置数据的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path("E:input"));
FileOutputFormat.setOutputPath(job, new Path("E:Output"));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
结果:



