- 是一个分布式运算程序的编程框架
- 核心功能是:将用户编写的业务逻辑代码,和自带默认组件整合成一个完整的分布式运算程序,并运行在一个Hadoop集群上。
- 优点:
- 易于编程:实现一些简单的接口,即可完成一个分布式程序;
- 良好的扩展性:增加机器即可拓展计算能力;
- 高容错:集群一台机器挂了,可以将其上的计算任务转移到另一节点上运行,不至于让此程序失败
- 适合海量数据的离线处理
- 缺点
- 不擅长实时计算
- 不擅长流式计算:流式计算数据输入是动态的,而MapReduce的数据输入必须是静态的,由设计特点决定
- 不擅长做DAG(有向无环图):一个程序的输出,作为另一个程序的输入,MapReduce的作业输出都会写入磁盘,会导致大量的磁盘IO,效率低下
- 核心思想
- 数据输入:分片,Map阶段会以逻辑分片的理念对要计算的文件进行读取(默认128M)
- Map阶段:将文件进行逻辑划分后,进行分割处理,MapTask(MT)并发进行
- Reduce阶段:对Map阶段处理好的数据进行汇总,ReduceTask(RT)并发进行
- 一个MR编程模型只能包含一个Map阶段,和一个Reduce阶段,如果业务复杂,只能写多个MR程序。
- 一个MR程序可以没有Reduce阶段,如果Map阶段的输出结果恰好是业务所需的话
一个完整的MR程序在分布式运行时有三类实例进程
- MapAppMaster:负责整个MR程序的过程调度及状态协调
- MapTask:负责Map阶段的数据处理流程
- ReduceTas:负责Reduce阶段的数据处理流程
- 驱动类:负责提交job
- Mapper:继承Hadoop提供的Map类,指定map阶段输入,输出的KV类型。重写map()方法,在里面编写业务逻辑代码。map()方法(MapTask进程)每有一个键值对,调用一次
- Reduce:继承Hadoop提供的Reduce类,指定reduce阶段输入,输出的KV类型。重写reduce()方法,在里面编写业务逻辑代码。reduce()方法,每有一组相同的K的KV组,调用一次。
- Driver类:
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取配置对象以及job类
Configuration con = new Configuration();
Job job = Job.getInstance(con);
//指定本Driver的类型
job.setJarByClass(WordCountDriver.class);
//指定mapper和reducer的类型
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReduce.class);
//指定mapper输出KV的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最终输出KV的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\hello.txt"));
FileOutputFormat.setOutputPath(job,new Path("D:\output"));
//提交job true:控制台打印执行过程 false:不打印
job.waitForCompletion(true);
}
}
- Mapper类
public class WordCountMapper extends Mapper{ //重写mapper方法 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //map阶段输出的V都是1,打标记 final IntWritable v = new IntWritable(1); Text k = new Text(); //读取,切割,输出 String str = value.toString(); String[] strs = str.split(" "); for (String word : strs) { k.set(word); context.write(k,v); } } }
- Reducer类:
public class WordCountReduce extends ReducerMR的提交方式{ //重写reduce方法 @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; //累加 ,输出 for (IntWritable value : values) { sum += value.get(); } context.write(key,new IntWritable(sum)); } }
- 本地运行:自测
- 打包后上传到集群运行:hadoop jar jar包路径 运行程序全类名 输入路径 输出路径
- 在Windows上向集群提交job(详见笔记)
- 序列化:把内存中的对象,转换成字节序列(或其他数据传输协议),以便于存储到磁盘(持久化)和网络传输
- 反序列化:将收到的字节序列(或其他传输协议),或是磁盘的持久化数据,转换成内存中的对象
- 为何要序列化:一般情况下,存活的对象只能存在内存中,关机断电则消失。且只能由本地的进程使用,不能发送到别的机器。序列化可以存储存活的对象,也可以将存活的对象发给远程计算机
- 为何不用java的序列化:java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化之后,会附带很多的额外信息(各种校验信息,Header,继承体系等),不利于在网络中的高速传输。所以Hadoop有一套自己的序列化机制——Writable。紧凑、高效、可扩展。
//1.必须实现Writable接口
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
//2.反序列化时,要调用空参构造,如无定义其他有参构造可不特意声明
public FlowBean(){
super();
}
//3.重写序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
//4.重写反序列化方法,序列化方法和反序列化方法顺序完全一致!
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
//5.想把结果展示在文件中,需要重写toString()
@Override
public String toString() {
return upFlow + "t" + downFlow + "t" +sumFlow;
}
}
手写序列化案例
- FlowBean如上
- FlowDriver
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//获取配置对象以及job类
Configuration con = new Configuration();
Job job = Job.getInstance(con);
//指定本Driver的类型
job.setJarByClass(FlowDriver.class);
//指定mapper和reducer的类型
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReduce.class);
//指定mapper输出KV的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
//指定最终输出KV的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\尚硅谷大数据\录屏+笔记+资料\7.hadoop\02.资料\07_测试数据\phone_data"));
FileOutputFormat.setOutputPath(job,new Path("D:\testMapReduce\output"));
//提交job true:控制台打印执行过程 false:不打印
job.waitForCompletion(true);
}
}
- FlowMapper
public class FlowMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取一行数据(map是一行一行读取) String line = value.toString(); //分割 String[] strs = line.split("t"); //获取各个数据 String phone = strs[1]; String upFlow = strs[strs.length - 3]; String downFlow = strs[strs.length - 2]; FlowBean outValue = new FlowBean(); outValue.setUpFlow(Long.parseLong(upFlow)); outValue.setDownFlow(Long.parseLong(downFlow)); outValue.setSumFlow(Long.parseLong(upFlow)+Long.parseLong(downFlow)); //写出 context.write(new Text(phone),outValue); } }
- FlowReducer
public class FlowReduce extends ReducerMapReduce框架原理(重点) InputFormat数据输入{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum_upFlow = 0; int sum_downFlow = 0; //累加各个参数 for (FlowBean value : values) { sum_upFlow += value.getUpFlow(); sum_downFlow += value.getDownFlow(); } FlowBean outValue = new FlowBean(); outValue.setUpFlow(sum_upFlow); outValue.setDownFlow(sum_downFlow); outValue.setSumFlow(sum_downFlow+sum_upFlow); //写出 context.write(key,outValue); } }
简略描述:InputFormat --> map --> reduce --> OutputFormat
详细描述:InputFormat --> map sort --> copy sort group reduce --> OutputFormat
– InputFormat 抽象类的子实现类是 FileInputFormat
– FileInputFormat 类的子实现类是 TextInputFormat
在InputFormat中有两个抽象方法:
MapTask的并行度决定Map阶段的任务处理并发度,进而影响整个Job的处理速度
-
问题:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?
-
切片:
- 数据块:Block,是HDFS在物理上吧数据分成一块块的,是HDFS存储的数据单位
- 数据切片:只是在逻辑上对输入进行分片,不会在磁盘上进行切分。数据切片是MapReduce程序输入数据的单位,一个切片会对应启动一个MapTask
- 切片大小默认情况下等于切块大小,目的是计算机读取数据时不会跨机器读取,提高数据读取效率
- 切片的时候不考虑数据的整体集,默认情况下对单个文件进行切片
public ListgetSplits(JobContext job) throws IOException { //计时器,记录该次切片过程用时 StopWatch sw = new StopWatch().start(); long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); //跟进一步后得出:maxSize=Long.MAX_VALUE,可通过配置文件改变 long maxSize = getMaxSplitSize(job); // generate splits List splits = new ArrayList (); List files = listStatus(job); boolean ignoreDirs = !getInputDirRecursive(job) && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false); for (FileStatus file: files) { //不忽略目录,对Job中设置的输入路径中的文件以及子目录中的文件全都处理 if (ignoreDirs && file.isDirectory()) { continue; } Path path = file.getPath(); //获取文件大小 long length = file.getLen(); if (length != 0) { //获取当前文件所对应的文件块信息 BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { //获取当前文件块大小 long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize); //当前文件剩余大小 long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); / int patitionNum ; String phone = text.toString(); if(phone.startsWith("136")){ patitionNum = 0; }else if(phone.startsWith("137")){ patitionNum = 1; }else if(phone.startsWith("138")){ patitionNum = 2; }else if(phone.startsWith("139")){ patitionNum = 3; }else { patitionNum = 4; } return patitionNum; } }
- 在Driver类里面添加配置:
//指定RT数量,即分区数量 job.setNumReduceTasks(5); //指定自定义分区器类的class job.setPartitionerClass(FlowPartition.class);
- 执行结果如下:
- 分区器注意事项:
- 当RT的数量设置 > 逻辑代码中实际用到的分区数,会产生空白的分区文件
- 当RT的数量设置 < 逻辑代码中实际用到的分区数,报错
- 当RT的数量设置 = 1时,结果会输出到一个文件中,代码论证如下:
- MT和RT都会对数据按照Key进行排序,且该操作属于默认操作,无论逻辑上是否需要
- 默认的排序规则是字典序,且该排序的实现方法是快速排序
- 在MT中的排序:
1、在环形缓冲区中存储达到阈值时,会对缓冲区中的数据进行一次排序,并将这些有序数据溢写到磁盘上,
2、当所有的MT执行完毕后,会对磁盘上的所有溢写文件进行归并排序 - 在RT中的排序:从MT上远程拷贝数据文件,如果文件大小超过阈值,则溢写到磁盘上。否则存储在内存中。
1、当磁盘上的文件数目达到阈值,则进行一次归并排序生成一个更大的文件
2、当内存中的文件大小或者数量达到阈值,则进行一次合并后溢写到磁盘上。
3、当所有数据拷贝完毕后,RT统一对内存和磁盘上的所有数据进行一次归并排序
- 1、部分排序(区内排序)
- 2、全排序(效率低)
- 3、辅助排序
- 4、二次排序
- 让参与比较的对象实现WritableComparable接口,并实现compareTo方法,自定义比较规则
- 这种情况,运行时Hadoop会自动生成比较器对象WritableComparator。
- 需求:按总流量进行排序(默认降序)
- Bean对象:
public class ComparableBean implements WritableComparable{ //自定义排序规则 @Override public int compareTo(ComparableBean bean) { return bean.getSumFlow().compareTo(this.sumFlow); } }
- Mapper对象:此时要注意,参与排序的对象类型要作为输出的K
public class ComparableMapper extends Mapper{}
- Reducer对象:同上,且无需做操作,为了防止对数据进行分组合并,展示不全,直接遍历输出即可
public class ComparableReduce extends Reducer<**ComparableBean**,Text,ComparableBean,Text> {
@Override
protected void reduce(ComparableBean key, Iterable values, Context context) throws IOException, InterruptedException {
//不用做任何操作,为了防止reduce分组数据展示不全,遍历输出即可
for (Text value : values) {
context.write(key,value);
}
}
}
全局排序的实现方式二:自定义Comparator比较器对象
- WritableComparator是Hadoop默认的比较器对象,如果想自定义,则需要继承他,重写compare()方法,自定义比较规则
- 在该类中通过无参构造,将自定义比较器与要参与排序的bean进行关联
- 最后在Driver中指定自定义比较器
- 自定义比较器对象:
public class Comparator extends WritableComparator {
//无参构造,指定当前自定义比较器为哪个类服务
//传true,绑定的bean对象才会初始化
public Comparator() {
super(ComparatorBean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ComparatorBean beanA = (ComparatorBean) a;
ComparatorBean beanB = (ComparatorBean) b;
return -beanA.getSumFlow().compareTo(beanB.getSumFlow());
}
}
- Driver类:
//指定自定义比较器 job.setSortComparatorClass(Comparator.class);区内排序的实现
- 只需要在分区器上,将输出的KV类型调换,在Driver里面指定RT数量和分区器类型即可实现
- 区内排序,将分区器和比较器进行了解耦
- Patitioner:
public class FlowBeanPartitioner extends Partitioner{}
- Driver:
job.setNumReduceTasks(5); job.setPartitionerClass(FlowBeanPartitioner.class);通过源码分析,Hadoop选取比较器对象的规则是什么
- 切入点分析:想要直接排序,肯定是在Map方法初始化的时候就把这些加载好了,所以去MapTask初始化阶段去找
- 源码分析:
- 以IntWritable为例:在类进行初始化的时候,就维护了<自身类型,WritableComparable类型>,所以Hadoop自带的数据类型不需要什么操作就可以进行比较排序
- Combiner是MR程序中,Mapper和Reducer之外的一个组件
- 其父类就是Reducer
- 跟Reducer的区别在于:
- Reducer是接收全局Mapper的输出结果
- Combiner是在每一个MT里执行
- Combiner的意义:提升MR程序运行效率,对每一个MT的输出汇总合并,减少了RT在MT里拉取数据的数量,也减少了磁盘的IO开销
- 合并其实是在Reducer里面做的,Combiner分担了Reducer的压力,但是仍然需要Reducer,因为要对所有MT的文件进行合并
- Combiner的使用步骤:
- 自定义Combiner类,直接继承Reducer类
- 在Driver里面指定Combiner类型
- 自定义Combiner类:
//Combiner输入KV类型为Map端输出KV类型,输出KV类型为Reduce端输入KV类型 public class WordCombiner extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //跟Reduce端的操作一模一样 int sum = 0; //累加 ,输出 for (IntWritable value : values) { sum += value.get(); } context.write(key,new IntWritable(sum)); } }
- Driver:
//指定Combiner类型
job.setCombinerClass(WordCombiner.class);
Combiner效果展示
Combiner不适用的场景
- Reduce端处理的数据考虑到多个MapTask的数据的整体集时就不能提前合并了。
- 如,求平均值,Map不能进行合并
- 当我们对MR最终的结果有个性化制定的需求,如修改输出文件的名字和路径等。可以通过自定义OutputFormat来实现
- 需求:过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
- 自定义OutPutFormat类:
//输出的时候只有key public class LogOutputFormat extends FileOutputFormat{ //获取 RecordWriter 来进行数据写出 @Override //若直接new RecordWriter的话,会要求实现里面的方法,所以提出来,单独写一个类,继承RecordWriter public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { LogRecordWriter recordWriter = new LogRecordWriter(job); return recordWriter; } }
- 自定义RecordWriter类:
public class LogRecordWriter extends RecordWriter{ //用hadoop提供的api创建输入、输出流对象 private String atguiguPath = "D:\testMapReduce\atguigu\atguigu.txt"; private String otherPath = "D:\testMapReduce\other\other.txt"; FSDataOutputStream atguigu; FSDataOutputStream other; FileSystem fs; public LogRecordWriter(TaskAttemptContext job) throws IOException { fs = FileSystem.get(job.getConfiguration()); atguigu = fs.create(new Path(atguiguPath)); other = fs.create(new Path(otherPath)); } @Override public void write(Text key, NullWritable value) throws IOException, InterruptedException { String line = key.toString(); if(line.contains("atguigu")){ atguigu.writeBytes(line+"n"); }else { other.writeBytes(line+"n"); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(other); IOUtils.closeStream(atguigu); IOUtils.closeStream(fs); } }
- Driver类:
//指定OutPutFormat
job.setOutputFormatClass(LogOutputFormat.class);
//指定输出路径
FileOutputFormat.setOutputPath(job,new Path("D:\testMapReduce\output"));
- Mapper和Reducer只需要遍历输出即可
- 执行结果如下:
- 在MR程序中计算数据的时候,出现输入文件是多个,且文件之间存在关联性,需要在计算过程中通过两个文件之间相互关联才能得到结果
- 实现步骤:
- 在Map阶段对文件进行数据整合,并且让关联字段作为输出数据的Key
- 当一组相同数据的key进入Reduce阶段后,先把两个文件数据分离出来,分别放到各自的对象中维护
- 把当前一组维护好的数据进行关联
- 可能会导致数据倾斜:分区不同,Reduce端有的RT可能会承担很高的负载
- Map端:
public class JoinMapper extends Mapper{ FileSplit fileSplit; String fileName; Text outk = new Text(); OrderPd outv = new OrderPd(); @Override protected void setup(Context context) throws IOException, InterruptedException { //分辨数据来源,用文件name //获取切片文件,用FileSplit接 fileSplit = (FileSplit) context.getInputSplit(); //获取文件名 fileName = fileSplit.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //对多个文件进行数据整合,并且让关联字段作为输出数据的key String line = value.toString(); //分割 String[] datas = line.split("t"); //用文件名确认数据来源,合并 //order文件:1001 01 1 if(fileName.contains("order")){ outk.set(datas[1]); outv.setOrderId(Integer.parseInt(datas[0])); outv.setPid(Integer.parseInt(datas[1])); outv.setAmount(Integer.parseInt(datas[2])); outv.setPname(""); outv.setTitle(fileName); }else { //数据来源pd文件:01 小米 outk.set(datas[0]); outv.setOrderId(0); outv.setPid(Integer.parseInt(datas[0])); outv.setAmount(0); outv.setPname(datas[1]); outv.setTitle(fileName); } context.write(outk,outv); } }
- Reducer
public class JoinReduce extends ReducerMap Join{ List pdList = new ArrayList(); OrderPd pd = new OrderPd(); @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //核心逻辑(先把数据根据来源进行分离,然后做join) for (OrderPd value : values) { String title = value.getTitle(); if(title.contains("order")){ //hadoop自身改写了jvm规则,需要自己new一个对象去接 try { OrderPd thisOrderPd = new OrderPd(); BeanUtils.copyProperties(thisOrderPd,value); pdList.add(thisOrderPd); } catch (Exception e) { e.printStackTrace(); } }else { try { BeanUtils.copyProperties(pd,value); } catch (Exception e) { e.printStackTrace(); } } } //做join关联 for (OrderPd orderPd : pdList) { orderPd.setPname(pd.getPname()); context.write(orderPd,NullWritable.get()); } //清空list pdList.clear(); } }
- 考虑整体MR的执行效率,且业务场景为:一个小文件和一个大文件进行关联时,可用Map join。也是解决数据倾斜的很有效的办法
- 实现步骤:
- 将小文件的数据映射到内存中的一个容器维护起来
- 当MT要处理大文件的数据时,没读取一行数据,就根据当前行总的关联字段到内存的容器里获取对象的信息
- 封装结果输出
- 代码实现略



