栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021.10.13 - 尚硅谷大数据 - hadoop3.1.3 - MapReduce篇

2021.10.13 - 尚硅谷大数据 - hadoop3.1.3 - MapReduce篇

MapReduce概述 定义
  • 是一个分布式运算程序的编程框架
  • 核心功能是:将用户编写的业务逻辑代码,和自带默认组件整合成一个完整的分布式运算程序,并运行在一个Hadoop集群上。
优点&缺点
  • 优点:
    • 易于编程:实现一些简单的接口,即可完成一个分布式程序;
    • 良好的扩展性:增加机器即可拓展计算能力;
    • 高容错:集群一台机器挂了,可以将其上的计算任务转移到另一节点上运行,不至于让此程序失败
    • 适合海量数据的离线处理
  • 缺点
    • 不擅长实时计算
    • 不擅长流式计算:流式计算数据输入是动态的,而MapReduce的数据输入必须是静态的,由设计特点决定
    • 不擅长做DAG(有向无环图):一个程序的输出,作为另一个程序的输入,MapReduce的作业输出都会写入磁盘,会导致大量的磁盘IO,效率低下
  • 核心思想
    • 数据输入:分片,Map阶段会以逻辑分片的理念对要计算的文件进行读取(默认128M)
    • Map阶段:将文件进行逻辑划分后,进行分割处理,MapTask(MT)并发进行
    • Reduce阶段:对Map阶段处理好的数据进行汇总,ReduceTask(RT)并发进行
    • 一个MR编程模型只能包含一个Map阶段,和一个Reduce阶段,如果业务复杂,只能写多个MR程序。
    • 一个MR程序可以没有Reduce阶段,如果Map阶段的输出结果恰好是业务所需的话
hadoop常用序列化类型

MapReduce的进程

一个完整的MR程序在分布式运行时有三类实例进程

  • MapAppMaster:负责整个MR程序的过程调度及状态协调
  • MapTask:负责Map阶段的数据处理流程
  • ReduceTas:负责Reduce阶段的数据处理流程
MR程序的编程规范
  • 驱动类:负责提交job
  • Mapper:继承Hadoop提供的Map类,指定map阶段输入,输出的KV类型。重写map()方法,在里面编写业务逻辑代码。map()方法(MapTask进程)每有一个键值对,调用一次
  • Reduce:继承Hadoop提供的Reduce类,指定reduce阶段输入,输出的KV类型。重写reduce()方法,在里面编写业务逻辑代码。reduce()方法,每有一组相同的K的KV组,调用一次。
手写官方wordCount案例
  • Driver类:
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取配置对象以及job类
        Configuration con = new Configuration();
        Job job = Job.getInstance(con);
        //指定本Driver的类型
        job.setJarByClass(WordCountDriver.class);
        //指定mapper和reducer的类型
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReduce.class);
        //指定mapper输出KV的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //指定最终输出KV的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //指定输入输出路径
        FileInputFormat.setInputPaths(job,new Path("D:\hello.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\output"));
        //提交job true:控制台打印执行过程 false:不打印
        job.waitForCompletion(true);

    }
}
  • Mapper类
public class WordCountMapper extends Mapper {
    //重写mapper方法
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //map阶段输出的V都是1,打标记
        final IntWritable v = new IntWritable(1);
        Text k = new Text();
        //读取,切割,输出
        String str = value.toString();
        String[] strs = str.split(" ");
        for (String word : strs) {
            k.set(word);
            context.write(k,v);
        }
    }
}
  • Reducer类:
public class WordCountReduce extends Reducer  {
    //重写reduce方法
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
    	int sum = 0;
        //累加 ,输出
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key,new IntWritable(sum));
    }
}
MR的提交方式
  • 本地运行:自测
  • 打包后上传到集群运行:hadoop jar jar包路径 运行程序全类名 输入路径 输出路径
  • 在Windows上向集群提交job(详见笔记)
Hadoop的序列化 序列化的概述
  • 序列化:把内存中的对象,转换成字节序列(或其他数据传输协议),以便于存储到磁盘(持久化)和网络传输
  • 反序列化:将收到的字节序列(或其他传输协议),或是磁盘的持久化数据,转换成内存中的对象
  • 为何要序列化:一般情况下,存活的对象只能存在内存中,关机断电则消失。且只能由本地的进程使用,不能发送到别的机器。序列化可以存储存活的对象,也可以将存活的对象发给远程计算机
  • 为何不用java的序列化:java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化之后,会附带很多的额外信息(各种校验信息,Header,继承体系等),不利于在网络中的高速传输。所以Hadoop有一套自己的序列化机制——Writable。紧凑、高效、可扩展。
自定义bean对象的序列化步骤
//1.必须实现Writable接口
public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    //2.反序列化时,要调用空参构造,如无定义其他有参构造可不特意声明
    public FlowBean(){
        super();
    }
    //3.重写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    //4.重写反序列化方法,序列化方法和反序列化方法顺序完全一致!
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }
    //5.想把结果展示在文件中,需要重写toString()
    @Override
    public String toString() {
        return upFlow + "t" + downFlow + "t" +sumFlow;
    }
}
手写序列化案例
  • FlowBean如上
  • FlowDriver
public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //获取配置对象以及job类
        Configuration con = new Configuration();
        Job job = Job.getInstance(con);
        //指定本Driver的类型
        job.setJarByClass(FlowDriver.class);
        //指定mapper和reducer的类型
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReduce.class);
        //指定mapper输出KV的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //指定最终输出KV的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //指定输入输出路径
        FileInputFormat.setInputPaths(job,new Path("D:\尚硅谷大数据\录屏+笔记+资料\7.hadoop\02.资料\07_测试数据\phone_data"));
        FileOutputFormat.setOutputPath(job,new Path("D:\testMapReduce\output"));
        //提交job true:控制台打印执行过程 false:不打印
        job.waitForCompletion(true);
    }
}
  • FlowMapper
public class FlowMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //获取一行数据(map是一行一行读取)
        String line = value.toString();
        //分割
        String[] strs = line.split("t");
        //获取各个数据
        String phone = strs[1];
        String upFlow = strs[strs.length - 3];
        String downFlow = strs[strs.length - 2];
        FlowBean outValue = new FlowBean();
        outValue.setUpFlow(Long.parseLong(upFlow));
        outValue.setDownFlow(Long.parseLong(downFlow));
        outValue.setSumFlow(Long.parseLong(upFlow)+Long.parseLong(downFlow));
        //写出
        context.write(new Text(phone),outValue);
    }
}
  • FlowReducer
public class FlowReduce extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        int sum_upFlow = 0;
        int sum_downFlow = 0;
        //累加各个参数
        for (FlowBean value : values) {
            sum_upFlow += value.getUpFlow();
            sum_downFlow += value.getDownFlow();
        }
        FlowBean outValue = new FlowBean();
        outValue.setUpFlow(sum_upFlow);
        outValue.setDownFlow(sum_downFlow);
        outValue.setSumFlow(sum_downFlow+sum_upFlow);
        //写出
        context.write(key,outValue);
    }
}
MapReduce框架原理(重点) InputFormat数据输入


简略描述:InputFormat --> map --> reduce --> OutputFormat
详细描述:InputFormat --> map sort --> copy sort group reduce --> OutputFormat

InputFormat继承体系

– InputFormat 抽象类的子实现类是 FileInputFormat
– FileInputFormat 类的子实现类是 TextInputFormat

在InputFormat中有两个抽象方法:

切片与MapTask并行度决定机制

MapTask的并行度决定Map阶段的任务处理并发度,进而影响整个Job的处理速度

  • 问题:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

  • 切片:

    • 数据块:Block,是HDFS在物理上吧数据分成一块块的,是HDFS存储的数据单位
    • 数据切片:只是在逻辑上对输入进行分片,不会在磁盘上进行切分。数据切片是MapReduce程序输入数据的单位,一个切片会对应启动一个MapTask
    • 切片大小默认情况下等于切块大小,目的是计算机读取数据时不会跨机器读取,提高数据读取效率
    • 切片的时候不考虑数据的整体集,默认情况下对单个文件进行切片
源码分析——InputFormat的默认实现类是?

源码分析——Hadoop的默认切片规则(getSplits()的源码分析)
 public List getSplits(JobContext job) throws IOException {
	 //计时器,记录该次切片过程用时
    StopWatch sw = new StopWatch().start();
	
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
	//跟进一步后得出:maxSize=Long.MAX_VALUE,可通过配置文件改变
    long maxSize = getMaxSplitSize(job);
    // generate splits
    List splits = new ArrayList();
    List files = listStatus(job);
	
    boolean ignoreDirs = !getInputDirRecursive(job)
      && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
    for (FileStatus file: files) {
		//不忽略目录,对Job中设置的输入路径中的文件以及子目录中的文件全都处理
      if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      Path path = file.getPath();
	  //获取文件大小
      long length = file.getLen();
      if (length != 0) {
		  //获取当前文件所对应的文件块信息
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
		
        if (isSplitable(job, path)) {
			//获取当前文件块大小
          long blockSize = file.getBlockSize();
			
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
			//当前文件剩余大小
          long bytesRemaining = length;
			
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
			/
        int patitionNum ;
        String phone = text.toString();
        if(phone.startsWith("136")){
            patitionNum = 0;
        }else if(phone.startsWith("137")){
            patitionNum = 1;
        }else if(phone.startsWith("138")){
            patitionNum = 2;
        }else if(phone.startsWith("139")){
            patitionNum = 3;
        }else {
            patitionNum = 4;
        }
        return patitionNum;
    }
}
  • 在Driver类里面添加配置:
//指定RT数量,即分区数量
job.setNumReduceTasks(5);
//指定自定义分区器类的class
job.setPartitionerClass(FlowPartition.class);
  • 执行结果如下:
  • 分区器注意事项:
    • 当RT的数量设置 > 逻辑代码中实际用到的分区数,会产生空白的分区文件
    • 当RT的数量设置 < 逻辑代码中实际用到的分区数,报错
    • 当RT的数量设置 = 1时,结果会输出到一个文件中,代码论证如下:
Hadoop的比较排序 排序概述
  • MT和RT都会对数据按照Key进行排序,且该操作属于默认操作,无论逻辑上是否需要
  • 默认的排序规则是字典序,且该排序的实现方法是快速排序
  • 在MT中的排序:
    1、在环形缓冲区中存储达到阈值时,会对缓冲区中的数据进行一次排序,并将这些有序数据溢写到磁盘上,
    2、当所有的MT执行完毕后,会对磁盘上的所有溢写文件进行归并排序
  • 在RT中的排序:从MT上远程拷贝数据文件,如果文件大小超过阈值,则溢写到磁盘上。否则存储在内存中。
    1、当磁盘上的文件数目达到阈值,则进行一次归并排序生成一个更大的文件
    2、当内存中的文件大小或者数量达到阈值,则进行一次合并后溢写到磁盘上。
    3、当所有数据拷贝完毕后,RT统一对内存和磁盘上的所有数据进行一次归并排序
排序的分类
  • 1、部分排序(区内排序)
  • 2、全排序(效率低)
  • 3、辅助排序
  • 4、二次排序
全局排序的实现方式一:实现WritableComparable接口
  • 让参与比较的对象实现WritableComparable接口,并实现compareTo方法,自定义比较规则
  • 这种情况,运行时Hadoop会自动生成比较器对象WritableComparator。
  • 需求:按总流量进行排序(默认降序)
  • Bean对象:
public class ComparableBean implements WritableComparable {
	 //自定义排序规则
    @Override
    public int compareTo(ComparableBean bean) {
        return bean.getSumFlow().compareTo(this.sumFlow);
    }
}
  • Mapper对象:此时要注意,参与排序的对象类型要作为输出的K
public class ComparableMapper extends Mapper {}
  • Reducer对象:同上,且无需做操作,为了防止对数据进行分组合并,展示不全,直接遍历输出即可
public class ComparableReduce extends Reducer<**ComparableBean**,Text,ComparableBean,Text> {
    @Override
    protected void reduce(ComparableBean key, Iterable values, Context context) throws IOException, InterruptedException {
        //不用做任何操作,为了防止reduce分组数据展示不全,遍历输出即可
        for (Text value : values) {
            context.write(key,value);
        }
    }
}
全局排序的实现方式二:自定义Comparator比较器对象
  • WritableComparator是Hadoop默认的比较器对象,如果想自定义,则需要继承他,重写compare()方法,自定义比较规则
  • 在该类中通过无参构造,将自定义比较器与要参与排序的bean进行关联
  • 最后在Driver中指定自定义比较器
  • 自定义比较器对象:
public class Comparator extends WritableComparator {

    //无参构造,指定当前自定义比较器为哪个类服务
    //传true,绑定的bean对象才会初始化
    public Comparator() {
        super(ComparatorBean.class,true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        ComparatorBean beanA = (ComparatorBean) a;
        ComparatorBean beanB = (ComparatorBean) b;
        return -beanA.getSumFlow().compareTo(beanB.getSumFlow());
    }
}
  • Driver类:
//指定自定义比较器
job.setSortComparatorClass(Comparator.class);
区内排序的实现
  • 只需要在分区器上,将输出的KV类型调换,在Driver里面指定RT数量和分区器类型即可实现
  • 区内排序,将分区器和比较器进行了解耦
  • Patitioner:
public class FlowBeanPartitioner extends Partitioner {}
  • Driver:
 job.setNumReduceTasks(5);
 job.setPartitionerClass(FlowBeanPartitioner.class);
通过源码分析,Hadoop选取比较器对象的规则是什么
  • 切入点分析:想要直接排序,肯定是在Map方法初始化的时候就把这些加载好了,所以去MapTask初始化阶段去找
  • 源码分析:
通过源码分析,Hadoop自身数据类型的比较器对象如何安排
  • 以IntWritable为例:在类进行初始化的时候,就维护了<自身类型,WritableComparable类型>,所以Hadoop自带的数据类型不需要什么操作就可以进行比较排序
Combiner合并 Combiner概述
  • Combiner是MR程序中,Mapper和Reducer之外的一个组件
  • 其父类就是Reducer
  • 跟Reducer的区别在于:
    • Reducer是接收全局Mapper的输出结果
    • Combiner是在每一个MT里执行
  • Combiner的意义:提升MR程序运行效率,对每一个MT的输出汇总合并,减少了RT在MT里拉取数据的数量,也减少了磁盘的IO开销
  • 合并其实是在Reducer里面做的,Combiner分担了Reducer的压力,但是仍然需要Reducer,因为要对所有MT的文件进行合并
  • Combiner的使用步骤:
    • 自定义Combiner类,直接继承Reducer类
    • 在Driver里面指定Combiner类型
Combiner的代码实现
  • 自定义Combiner类:
//Combiner输入KV类型为Map端输出KV类型,输出KV类型为Reduce端输入KV类型
public class WordCombiner extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        //跟Reduce端的操作一模一样
        int sum = 0;
        //累加 ,输出
        for (IntWritable value : values) {
            sum += value.get();
        }
        context.write(key,new IntWritable(sum));
    }
}
  • Driver:
//指定Combiner类型
        job.setCombinerClass(WordCombiner.class);
Combiner效果展示

Combiner不适用的场景
  • Reduce端处理的数据考虑到多个MapTask的数据的整体集时就不能提前合并了。
  • 如,求平均值,Map不能进行合并
OutPutFormat OutPutFormat接口实现

自定义OutputFormat使用场景
  • 当我们对MR最终的结果有个性化制定的需求,如修改输出文件的名字和路径等。可以通过自定义OutputFormat来实现
  • 需求:过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
  • 自定义OutPutFormat类:
//输出的时候只有key
public class LogOutputFormat extends FileOutputFormat {
    //获取 RecordWriter 来进行数据写出
    @Override
    //若直接new RecordWriter的话,会要求实现里面的方法,所以提出来,单独写一个类,继承RecordWriter
    public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        LogRecordWriter recordWriter = new LogRecordWriter(job);
        return recordWriter;
    }
}
  • 自定义RecordWriter类:
public class LogRecordWriter extends RecordWriter {
    //用hadoop提供的api创建输入、输出流对象
    private String atguiguPath = "D:\testMapReduce\atguigu\atguigu.txt";
    private String otherPath = "D:\testMapReduce\other\other.txt";
    FSDataOutputStream atguigu;
    FSDataOutputStream other;
    FileSystem fs;

    public LogRecordWriter(TaskAttemptContext job) throws IOException {
        fs = FileSystem.get(job.getConfiguration());
        atguigu = fs.create(new Path(atguiguPath));
        other = fs.create(new Path(otherPath));
    }
    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String line = key.toString();
        if(line.contains("atguigu")){
            atguigu.writeBytes(line+"n");
        }else {
            other.writeBytes(line+"n");
        }
    }
    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        IOUtils.closeStream(other);
        IOUtils.closeStream(atguigu);
        IOUtils.closeStream(fs);
    }
}
  • Driver类:
//指定OutPutFormat
job.setOutputFormatClass(LogOutputFormat.class);
//指定输出路径
FileOutputFormat.setOutputPath(job,new Path("D:\testMapReduce\output"));
  • Mapper和Reducer只需要遍历输出即可
  • 执行结果如下:
Join的应用 Reduce Join
  • 在MR程序中计算数据的时候,出现输入文件是多个,且文件之间存在关联性,需要在计算过程中通过两个文件之间相互关联才能得到结果
  • 实现步骤:
    • 在Map阶段对文件进行数据整合,并且让关联字段作为输出数据的Key
    • 当一组相同数据的key进入Reduce阶段后,先把两个文件数据分离出来,分别放到各自的对象中维护
    • 把当前一组维护好的数据进行关联
  • 可能会导致数据倾斜:分区不同,Reduce端有的RT可能会承担很高的负载
  • Map端:
public class JoinMapper extends Mapper {

    FileSplit fileSplit;
    String fileName;
    Text outk = new Text();
    OrderPd outv = new OrderPd();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //分辨数据来源,用文件name
        //获取切片文件,用FileSplit接
        fileSplit = (FileSplit) context.getInputSplit();
        //获取文件名
        fileName = fileSplit.getPath().getName();
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //对多个文件进行数据整合,并且让关联字段作为输出数据的key
        String line = value.toString();
        //分割
        String[] datas = line.split("t");
        //用文件名确认数据来源,合并
        //order文件:1001	01	1
        if(fileName.contains("order")){
            outk.set(datas[1]);
            outv.setOrderId(Integer.parseInt(datas[0]));
            outv.setPid(Integer.parseInt(datas[1]));
            outv.setAmount(Integer.parseInt(datas[2]));
            outv.setPname("");
            outv.setTitle(fileName);
        }else {
            //数据来源pd文件:01	小米
            outk.set(datas[0]);
            outv.setOrderId(0);
            outv.setPid(Integer.parseInt(datas[0]));
            outv.setAmount(0);
            outv.setPname(datas[1]);
            outv.setTitle(fileName);
        }
        context.write(outk,outv);
    }
}
  • Reducer
public class JoinReduce extends Reducer {
    List pdList = new ArrayList();
    OrderPd pd = new OrderPd();
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        //核心逻辑(先把数据根据来源进行分离,然后做join)
        for (OrderPd value : values) {
            String title = value.getTitle();
            if(title.contains("order")){
                //hadoop自身改写了jvm规则,需要自己new一个对象去接
                try {
                    OrderPd thisOrderPd = new OrderPd();
                    BeanUtils.copyProperties(thisOrderPd,value);
                    pdList.add(thisOrderPd);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }else {
                try {
                    BeanUtils.copyProperties(pd,value);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        //做join关联
        for (OrderPd orderPd : pdList) {
            orderPd.setPname(pd.getPname());
            context.write(orderPd,NullWritable.get());
        }
        //清空list
        pdList.clear();
    }
}
Map Join
  • 考虑整体MR的执行效率,且业务场景为:一个小文件和一个大文件进行关联时,可用Map join。也是解决数据倾斜的很有效的办法
  • 实现步骤:
    • 将小文件的数据映射到内存中的一个容器维护起来
    • 当MT要处理大文件的数据时,没读取一行数据,就根据当前行总的关联字段到内存的容器里获取对象的信息
    • 封装结果输出
  • 代码实现略
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/329725.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号