栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据高级开发工程师——Hadoop学习笔记(5)

大数据高级开发工程师——Hadoop学习笔记(5)

文章目录
  • Hadoop进阶篇
    • MapReduce:Hadoop分布式并行计算框架
      • 序列化与反序列化
        • 1. 需求
        • 2. 代码实现
      • MapReduce当中的排序
        • 1. 可排序的 Key
        • 2. 排序的种类
        • 3. 二次排序
      • MapReduce中的Combiner
        • 1. 基本介绍
        • 2. 需求
      • MapReduce中的GroupingComparator分组详解
        • 1. 自定义 WritableComparator
        • 2. 需求
        • 3. 需求分析
        • 4. 代码实现
      • 自定义OutputFormat
        • 1. 需求
        • 2. 分析
        • 3. 代码实现
      • shuffle中数据压缩
        • 1. hadoop 支持的压缩算法比较
        • 2. 如何开启压缩
      • 计数器与累加器
        • 1. hadoop 内置计数器
        • 2. 自定义计数器

Hadoop进阶篇 MapReduce:Hadoop分布式并行计算框架 序列化与反序列化
  • 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
  • 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
  • Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输,所以,hadoop 自己开发了一套序列化机制(Writable):
    • 精简,高效。
    • 不用像 java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。
  • Writable 是 Hadoop 的序列化格式的接口,一个类要支持可序列化,只需实现这个接口即可。
  • 另外 Writable 有一个子接口是 WritableComparable,其即可实现序列化,也可以对 Key 进行比较,这里可以通过自定义 key 利用 WritableComparable来实现排序功能。
  • 在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。
  • 具体实现 bean对象序列化:
    • 1、必须实现 Writable 接口;
    • 2、反序列化时,需要反射调用空参构造函数,所以必须有空参构造。
1. 需求

  • 现有用户手机上网详单数据,求取每个手机号的上行包个数之和、下行包个数之和,以及上行总流量之和、下行总流量之和。
2. 代码实现
  • 定义 JavaBean 对象
public class FlowBean implements Writable {
    
    private int upPackNum;
    
    private int downPackNum;
    private int upPayLoad;
    private int downPayLoad;
    
    public FlowBean() {
    }
    @Override
    public void write(DataOutput out) throws IOException {
        // 调用序列化方法时,要用于类型匹配的write方法,并且要记住序列化顺序
        out.writeInt(upPackNum);
        out.writeInt(downPackNum);
        out.writeInt(upPayLoad);
        out.writeInt(downPayLoad);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // 反序列化的顺序要与序列化保持一致,并使用匹配类型的方法
        this.upPackNum = in.readInt();
        this.downPackNum = in.readInt();
        this.upPayLoad = in.readInt();
        this.downPayLoad = in.readInt();
    }

    // setter、getter、toString 省略 ...
}
  • 定义 Mapper 类:
public class FlowMapper extends Mapper {
    private FlowBean flowBean;
    private Text text;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        this.flowBean = new FlowBean();
        this.text = new Text();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] slices = value.toString().split("t");
        
        text.set(slices[1]);
        
        flowBean.setUpPackNum(Integer.parseInt(slices[6]));
        flowBean.setDownPackNum(Integer.parseInt(slices[7]));
        flowBean.setUpPayLoad(Integer.parseInt(slices[8]));
        flowBean.setDownPayLoad(Integer.parseInt(slices[9]));
        
        context.write(text, flowBean);
    }
}
  • 定义 Reducer 类:
public class FlowReducer extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int upPackNum = 0;
        int downPackNum = 0;
        int upPayLoad = 0;
        int downPayLoad = 0;

        for (FlowBean value : values) {
            upPackNum += value.getUpPackNum();
            downPackNum += value.getDownPackNum();
            upPayLoad += value.getUpPayLoad();
            downPayLoad += value.getDownPayLoad();
        }
        context.write(key, new Text(upPackNum + "t" + downPackNum + "t" + upPayLoad + "t" + downPayLoad));
    }
}
  • 定义 main 程序入口:
public class FlowMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), FlowMain.class.getSimpleName());
        job.setJarByClass(FlowMain.class);

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        job.setMapperClass(FlowMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        job.setReducerClass(FlowReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new FlowMain(), args);
        System.exit(run);
    }
}
MapReduce当中的排序 1. 可排序的 Key
  • 排序是 MapReduce 框架中最重要的操作之一,MapTask 和 ReduceTask 均会对数据按照 Key 进行排序。
  • 该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
  • 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
  • 对于ReduceTask,它从每个执行完成的MapTask上远程拷贝相应的数据文件
    • 如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中;
    • 如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;
    • 如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。
    • 当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
2. 排序的种类
  • 部分排序:MapReduce 根据输入记录的键对数据集排序,保证输出的每个文件内部有序。
  • 全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个 ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完成丧失了 MapReduce 所提供的并行能力。
  • 辅助排序:在 Reduce 端对 Key 进行分组,应用于在接收的 key 和 bean 对象时,想让一个或几个字段相同(全部字段比较不相同)的 key 进入到同一个 reduce 方法时,可以采用分组排序
  • 二次排序:在 MR 编程中,需要先按输入数据的某一列排序,如果相同,再按另一列排序。MR 自带的类型作为 key 无法满足需求,往往需要自定义 JavaBean 作为 Map 输出的Key,JavaBean 中使用 compareTo 方法指定排序规则。
3. 二次排序
  • 样本数据如下,每条数据有5个字段,分别是手机号、上行包总个数、下行包总个数、上行总流量、下行总流量
13480253104	3	3	180	180
13502468823	57	102	7335	110349
13560439658	33	24	2034	5892
13600217502	18	138	1080	186852
13602846565	15	12	1938	2910
13660577991	24	9	6960	690
13719199419	4	0	240	0
13726230503	24	27	2481	24681
13760778710	2	2	120	120
13823070001	6	3	360	180
13826544101	4	0	264	0
13922314466	12	12	3008	3720
13925057413	69	63	11058	48243
13926251106	4	0	240	0
13926435656	2	4	132	1512
15013685858	28	27	3659	3538
15920133257	20	20	3156	2936
15989002119	3	3	1938	180
18211575961	15	12	1527	2106
18320173382	21	18	9531	2412
84138413	20	16	4116	1432
  • 需求:先对下行包总数升序排序;若相等,再按上行总流量进行降序排序。
  • 自定义JavaBean:用于封装数据及定义排序规则
public class FlowSortBean implements WritableComparable {
    private String phone;
    private Integer upPackNum;
    private Integer downPackNum;
    private Integer upPayLoad;
    private Integer downPayLoad;
    @Override
    public int compareTo(FlowSortBean o) {
        int result = this.downPackNum.compareTo(o.downPackNum);
        if (result == 0) {
            result = - this.upPayLoad.compareTo(o.upPayLoad);
        }
        return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(phone);
        out.writeInt(upPackNum);
        out.writeInt(downPackNum);
        out.writeInt(upPayLoad);
        out.writeInt(downPayLoad);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.phone = in.readUTF();
        this.upPackNum = in.readInt();
        this.downPackNum = in.readInt();
        this.upPayLoad = in.readInt();
        this.downPayLoad = in.readInt();
    }
    // setter、getter、toString 省略...
}
  • 自定义 Mapper 类:
public class FlowSortMapper extends Mapper {
    private FlowSortBean flowSortBean;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        flowSortBean = new FlowSortBean();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] slices = value.toString().split("t");
        
        flowSortBean.setPhone(slices[0]);
        flowSortBean.setUpPackNum(Integer.parseInt(slices[1]));
        flowSortBean.setDownPackNum(Integer.parseInt(slices[2]));
        flowSortBean.setUpPayLoad(Integer.parseInt(slices[3]));
        flowSortBean.setDownPayLoad(Integer.parseInt(slices[4]));
        
        context.write(flowSortBean, NullWritable.get());
    }
}
  • 自定义 Reducer 类:
public class FlowSortReducer extends Reducer {
    @Override
    protected void reduce(FlowSortBean key, Iterable values, Context context) throws IOException, InterruptedException {
        // 经过排序后,直接输出
        context.write(key, NullWritable.get());
    }
}
  • 定义 main 程序入口:
public class FlowSortMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "FlowSort");
        job.setJarByClass(FlowSortMain.class);

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        job.setMapperClass(FlowSortMapper.class);
        job.setMapOutputKeyClass(FlowSortBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setReducerClass(FlowSortReducer.class);
        job.setOutputKeyClass(FlowSortBean.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new FlowSortMain(), args);
        System.exit(run);
    }
}
  • 运行输出结果:

MapReduce中的Combiner 1. 基本介绍
  • Combiner 类本质也是 Reducer 聚合,Combiner 类继承 Reducer 类,Combiner 是运行在 Map 端,对 mapTask 内部的结果做聚合。
  • 作用:可以减少 mapTask 落盘及向 reduceTask 传输的数据量。
  • 是否可以做 map 端的 combine:并非所有的 MR job 都适用 combine,原则是无论使用不使用combine,都不能对最终的结果造成影响,比如下边的例子,就不适用:
Mapper:
3 5 7 => (3+5+7)/2 = 5
2 6 => (2+6)/2 = 4

Reducer
(3+5+7+2+6)/5 = 23/5  不等于 (5+4)/2 = 9/2
2. 需求
  • 对之前的 wordCount 单词计数统计的例子,加上Combiner过程,实现map端的数据进行汇总之后,再发送到reduce端,减少数据的网络拷贝。
  • 在main方法中加入:
job.setCombinerClass(MyReducer.class);
  • 运行程序,观察控制台输出有combiner和没有combiner的区别

MapReduce中的GroupingComparator分组详解

  • GroupingComparator 是 MapReduce 当中 Reduce 端决定哪些数据作为一组,调用一次 reduce 的逻辑。
  • 默认是 key 相同的K-V对,作为同一组,每组调用一次 reduce 方法。
  • 可以自定义 GroupingComparator,实现自定义的分组逻辑。
1. 自定义 WritableComparator
  • 继承 WritableComparator 类,并重写 compare 方法
@Override
public int compare(WritableComparable a, WritableComparable b) {
        // 比较的业务逻辑
        return result;
}
  • 创建一个构造器,将比较对象的类传给父类:
protected OrderGroupingComparator() {
        super(OrderBean.class, true);
}
2. 需求
  • 现有订单数据如下,需要求取每个订单当中金额最大的商品:
订单id				商品id			成交金额
Order_0000001		Pdt_01			222.8
Order_0000001		Pdt_05			25.8
Order_0000002		Pdt_03			322.8
Order_0000002		Pdt_04			522.4
Order_0000002		Pdt_05			822.4
Order_0000003		Pdt_01			222.8
3. 需求分析

4. 代码实现
  • 自定义OrderBean
public class OrderBean implements WritableComparable {
    private String orderId;
    private Double price;
    
    @Override
    public int compareTo(OrderBean o) {
        int result = this.orderId.compareTo(o.orderId);
        if (result == 0) {
            result = -this.price.compareTo(o.price);
        }
        return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();
    }

    // setter、getter、toString 省略
}
  • 自定义Mapper类
public class GroupMapper extends Mapper {
    private OrderBean orderBean;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        orderBean = new OrderBean();
    }
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] slices = value.toString().split("t");
        orderBean.setOrderId(slices[0]);        
        orderBean.setPrice(Double.valueOf(slices[2]));
        
        context.write(orderBean, NullWritable.get());
    }
}
  • 自定义分区类
public class GroupPartitioner extends Partitioner {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
        // 将订单号相同的分在一个区
        return orderBean.getOrderId().hashCode() % numPartitions;
    }
}
  • 自定义分组类
public class MyGroup extends WritableComparator {
    public MyGroup() {
        // 分组类,要对OrderBean类型的key进行分组
        super(OrderBean.class, true);
    }

    @Override
    public int compare(Object a, Object b) {
        OrderBean a1 = (OrderBean) a;
        OrderBean b1 = (OrderBean) b;
        return a1.getOrderId().compareTo(b1.getOrderId());
    }
}
  • 自定义Reducer类
public class GroupReducer extends Reducer {
    @Override
    protected void reduce(OrderBean key, Iterable values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}
  • 定义程序入口类
public class GroupMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //获取job对象
        Job job = Job.getInstance(super.getConf(), "group");
        job.setJarByClass(GroupMain.class);

        //第一步:读取文件,解析成为key,value对
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        //第二步:自定义map逻辑
        job.setMapperClass(GroupMapper.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //第三步:分区
        job.setPartitionerClass(GroupPartitioner.class);

        //第四步:排序 已经做了

        //第五步:规约 combiner  省掉

        //第六步:分组 自定义分组逻辑
        job.setGroupingComparatorClass(MyGroup.class);

        //第七步:设置reduce逻辑
        job.setReducerClass(GroupReducer.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        //第八步:设置输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
        System.exit(run);
    }
}
  • 运行程序,查看输出结果:
Order_0000001	222.8
Order_0000001	25.8
Order_0000002	822.4
Order_0000002	522.4
Order_0000002	322.8
Order_0000003	222.8
  • 拓展:如何求每个组当中的top2的订单金额数据???
int num = 0;
for(NullWritable value: values) {
	context.write(key, value);
	num++;
	if(num == 2)
		break;
}
自定义OutputFormat 1. 需求
  • 现在有一些订单的评论数据,需求,将订单的好评与其他评论(中评、差评)进行区分开来,将最终的数据分开到不同的文件夹下面去
  • 数据内容参见资料文件夹,其中数据第九个字段表示好评,中评,差评。0:好评,1:中评,2:差评

2. 分析
  • 程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现。
  • 实现要点:
    • 在mapreduce中访问外部资源
    • 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
3. 代码实现
  • 自定义一个 OutputFormat:
public class MyOutputFormat extends FileOutputFormat {
    static class MyRecordWriter extends RecordWriter {
        private FSDataOutputStream goodStream;
        private FSDataOutputStream badStream;

        public MyRecordWriter(FSDataOutputStream goodStream, FSDataOutputStream badStream) {
            this.goodStream = goodStream;
            this.badStream = badStream;
        }

        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            if (key.toString().split("t")[9].equals("0")) {// 好评
                goodStream.write(key.toString().getBytes());
                goodStream.write("rn".getBytes());
            } else { // 中评或差评
                badStream.write(key.toString().getBytes());
                badStream.write("rn".getBytes());
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (badStream != null) badStream.close();
            if (goodStream != null) goodStream.close();
        }

    }

    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
        FileSystem fs = FileSystem.get(context.getConfiguration());
        Path goodCommentPath = new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/main/resources/output/good.txt");
        Path badCommentPath = new Path("/Volumes/F/MyGitHub/bigdata/hadoop-demo/src/main/resources/output/bad.txt");

        FSDataOutputStream goodOutputStream = fs.create(goodCommentPath);
        FSDataOutputStream badOutputStream = fs.create(badCommentPath);

        return new MyRecordWriter(goodOutputStream, badOutputStream);
    }
}
  • 定义程序入口类:
public class OutputFormatMain extends Configured implements Tool {
    static class OutputFormatMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), OutputFormatMain.class.getSimpleName());
        job.setJarByClass(OutputFormatMain.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));
        
        job.setMapperClass(OutputFormatMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        // 使用默认的reduce类的逻辑
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        job.setOutputFormatClass(MyOutputFormat.class);
        MyOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.setNumReduceTasks(2);
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new OutputFormatMain(), args);
        System.exit(run);
    }
}
  • 运行查看输出结果

shuffle中数据压缩
  • shuffle 阶段,可以看到数据通过大量的拷贝,从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段,这一过程中,涉及到大量的网络IO,如果数据能够进行压缩,那么数据的发送量就会少得多。
  • 从map阶段输出的数据,都要通过网络拷贝,发送到reduce阶段,这一过程中,涉及到大量的网络IO,如果数据能够进行压缩,那么数据的发送量就会少得多
  • MapReduce的执行流程:
MapReduce
input
mapper
shuffle
partitioner、sort、combiner、【compress】、group
reducer
output
  • 文件压缩有两个好处:节约磁盘空间,加速数据在网络和磁盘上的传输。
  • 查看 hadoop 支持的压缩算法:
hadoop checknative
1. hadoop 支持的压缩算法比较
压缩格式工具算法文件扩展名是否可切分对应使用的Java类
DEFLATEDEFLATE.deflateorg.apache.hadoop.io.compress.DefaultCodec
GzipgzipDEFLATE.gzorg.apache.hadoop.io.compress.GzipCodec
bzip2bzip2bzip2.bz2org.apache.hadoop.io.compress.BZip2Codec
LZOlzopLZO.lzocom.hadoop.compression.lzo.LzopCodec
LZ4LZ4.lz4org.apache.hadoop.io.compress.Lz4Codec
SnappySnappy.snappyorg.apache.hadoop.io.compress.SnappyCodec
  • 压缩速率比较
压缩算法原始文件大小压缩后的文件大小压缩速率解压缩速度
gzip8.3GB1.8GB17.5MB/s58MB/s
bzip28.3GB1.1GB2.4MB/s9.5MB/s
LZO-bset8.3GB2GB4MB/s60.6MB/s
LZO8.3GB2.9GB135MB/s410MB/s
Snappy8.3GB1.8GB172MB/s409MB/s
  • 常用的压缩算法主要有LZO和snappy
2. 如何开启压缩
  • 在代码中设置压缩:
Configuration configuration = new Configuration();
// 设置 map 阶段压缩
configuration.set("mapreduce.map.output.compress", "true");
configuration.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
// 设置 reduce 阶段的压缩
configuration.set("mapreduce.output.fileoutputformat.compress", "true");
configuration.set("mapreduce.output.fileoutputformat.compress.type", "RECORD");
configuration.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
  • 修改 mapred-site.xml 进行 MapReduce 压缩:所有节点都要修改mapred-site.xml,修改完成之后记得重启集群

	mapreduce.map.output.compress
	true


	mapreduce.map.output.compress.codec
	org.apache.hadoop.io.compress.SnappyCodec



       
	mapreduce.output.fileoutputformat.compress
	true

        
	mapreduce.output.fileoutputformat.compress.type
	RECORD

        
	mapreduce.output.fileoutputformat.compress.codec
	org.apache.hadoop.io.compress.SnappyCodec 

计数器与累加器
  • 计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到map 或reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。
1. hadoop 内置计数器
计数器对应的Java类
MapReduce任务计数器org.apache.hadoop.mapreduce.TaskCounter
文件系统计数器org.apache.hadoop.mapreduce.FileSystemCounter
FileInputFormat计数器org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
FileOutputFormat计数器org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter
作业计数器org.apache.hadoop.mapreduce.JobCounter
  • 每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中最重要的一些日志记录如下截图

2. 自定义计数器
  • 利用之前排序及序列化的案例,统计 map 端接收到的数据的条数。
  • Mapper 中,通过 context 上下文对象可以获取我们的计数器,并进行记录,通过 context 上下文对象,在 map 端使用计数器进行统计:

  • Reducer 中,通过 enum 枚举类型来定义计数器,统计 reduce 端数据的输入 key 有多少个,对应的 value 有多少个:

  • 运行程序查看输出结果

  • github 源代码地址:https://github.com/shouwangyw/bigdata/tree/master/hadoop-demo
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/629337.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号