MapReduce是一个分布式运算程序的编程框架,核心功能是将用户编写的业务逻辑代码和自带的默认组件,整合成一个完整的分布式运算程序,并行的运行在一个Hadoop集群上。
1. 优缺点- 易于编程:简单的实现了MR接口,就可以完成一个分布式程序。
- 良好的扩展性:当计算资源不能满足时,可以通过简单的增加机器来扩展计算能力
- 高容错性:若一台机器挂了,会自动将正在运行的计算任务转移到另外一个节点上运行,不至于这个任务运行失败
- 适合处理PB级别以上的离线数据,但不擅长ms级别的实时计算和流式计算
- 不擅长DAG(有向无环图)计算,因为每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
MR工作流程
- MR运算程序一般分成Map阶段和Reduce阶段
- Map阶段会以逻辑分片的理念对要计算的文件进行读取(128M)
- 第一个阶段的MapTask是对文件进行逻辑划分后进行分割处理,每个MT之间完全并行运行,每个MT之间不相关
- 第二阶段的ReduceTask是将Map阶段处理好的数据进行汇总,之间也是并行运行,每个RT之间不想关
- 一个MR程序只能包含一个Map阶段和Reduce阶段,若业务逻辑非常复杂,就只能多个MR程序穿行运行,但会产生大量的IO效率低下
MR进程
一个完整的MR程序在分布式运行时有三类实例继承:
- MrAppMaster:负责整个程序的过程调度及状态协调
- MapTask:负责Map阶段的真个数据处理流程
- ReduceTask:负责Reduce阶段的整个数据处理流程
Mapper阶段
- 用户自定义的Mapper要继承自己的父类
- Mapper的输入数据是KV对的形式
- Mapper中的业务逻辑在Map()方法中实现
- Mapper的输出数据是KV对的形式
- 每一行数据对Map()方法调用一次
Reducer阶段
- 用户自定义的Reducer要继承自己的父类
- Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
- Reducer的业务逻辑在reduce()方法中实现
- ReduceTask进程对每一组相同的K的KV组调用一次reduce()方法
Driver阶段
相当于Yarn集群的客户端,用于提交整个程序到Yarn集群,提交的是封装了MapReduce程序相关运行参数的job对象
2. WordCount实现实现Mapper类
public class WordCountMapper extends Mapper{ Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取读取的一行数据 String line = value.toString(); // 2 切割 String[] words = line.split(" "); // 3 输出 for (String word : words) { k.set(word); context.write(k, v); } } }
实现Reducer类
public class WordcountReducer extends Reducer{ int sum; IntWritable v = new IntWritable(); @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 累加求和 sum = 0; for (IntWritable count : values) { sum += count.get(); } // 2 输出 v.set(sum); context.write(key,v); } }
实现Driver类
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 集群运行需要配置
Configuration configuration = new Configuration();
//设置HDFS NameNode的地址
configuration.set("fs.defaultFS", "hdfs://hadoop102:9820");
// 指定MapReduce运行在Yarn上
configuration.set("mapreduce.framework.name","yarn");
// 指定mapreduce可以在远程集群运行
configuration.set("mapreduce.app-submission.cross-platform","true");
//指定Yarn resourcemanager的位置
configuration.set("yarn.resourcemanager.hostname","hadoop103");
// 获取配置信息以及获取job对象
Job job = Job.getInstance(configuration);
// 2 关联本Driver程序的jar
job.setJarByClass(WordcountDriver.class);
// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
// 4 设置Mapper输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
三、Hadoop序列化
Hadoop并没有采用Java的序列化机制,Java的序列化是一个重量级序列化框架,一个对象被序列化后会附带很多额外的信息,不便于在网络中传输,所以自己开发了一套序列化机制。
1. 自定义实现序列化接口-
必须实现Writable接口
-
反序列化时,需要反射调用空参构造函数,所以必须空参构造
public FlowBean() { super(); } -
重写序列化方法
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } -
重写反序列化方法
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } -
注意反序列化的顺序和序列化的顺序完全一致
实现writable接口
// 1 实现writable接口
public class FlowBean implements Writable{
private long upFlow;
private long downFlow;
private long sumFlow;
//2 反序列化时,需要反射调用空参构造函数,所以必须有
public FlowBean() {
super();
}
public FlowBean(long upFlow, long downFlow) {
super();
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow + downFlow;
}
//3 写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//4 反序列化方法
//5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
// 6 编写toString方法,方便后续打印到文本
@Override
public String toString() {
return upFlow + "t" + downFlow + "t" + sumFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
编写Mapper类
public class FlowCountMapper extends Mapper{ FlowBean v = new FlowBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 切割字段 String[] fields = line.split("t"); // 3 封装对象 // 取出手机号码 String phoneNum = fields[1]; // 取出上行流量和下行流量 long upFlow = Long.parseLong(fields[fields.length - 3]); long downFlow = Long.parseLong(fields[fields.length - 2]); k.set(phoneNum); v.set(downFlow, upFlow); // 4 写出 context.write(k, v); } }
编写Reduce类
public class FlowCountReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context)throws IOException, InterruptedException { long sum_upFlow = 0; long sum_downFlow = 0; // 1 遍历所用bean,将其中的上行流量,下行流量分别累加 for (FlowBean flowBean : values) { sum_upFlow += flowBean.getUpFlow(); sum_downFlow += flowBean.getDownFlow(); } // 2 封装对象 FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow); // 3 写出 context.write(key, resultBean); } }
编写Driver驱动类
public class FlowsumDriver {
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[] { "e:/input/inputflow", "e:/output1" };
// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 指定本程序的jar包所在的本地路径
job.setJarByClass(FlowsumDriver.class);
// 3 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
// 4 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}



