栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

MapReduce框架原理之Shuffle机制

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

MapReduce框架原理之Shuffle机制

Shuffle机制

目录

Shuffle机制

一、Partition分区

1.什么是分区

2.默认的Partition分区

3.自定义Partition步骤

二、WritableComparable排序

1.默认排序

2.排序概述

3.排序分类

三、Combiner合并


MapReduce总流程:

        input >> map(map+sort) >>reduce(copy+sort+reduce) >> output map的sort+reduce的+copy+sort 组成shuffle

一、Partition分区

1.什么是分区

        要求将统计结果按照条件输出到不同文件中,这个过程就叫做分区。

2.默认的Partition分区

        默认分区是根据key的hashCode%reduceTask数量得到所有的分区号,用户无法控制key存储到哪个分区,默认reduceTask的数量为1,也可以在driver端进行设置。

3.自定义Partition步骤

1)自定义继承Partitioner,重写getPartition()方法

2)在Job驱动中,设置自定义Partitioner

3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

        注意:a.如果ReduceTask的数量>getPartition的结果数,则会产生多个空的输出文件part-r-000xx

                b.如果如果ReduceTask的数量

                c.如果ReduceTask数量=1,则不管MapTask端输出多少个分区文件,最终结果都会交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000

                d.分区号必须从零开始,逐一累加

实例:

在Hadoop序列化_lslslslslss的博客-CSDN博客中序列化的例子上进行改动

(1)增加一个分区类

public class ProvincePartitioner extends Partitioner {

    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        String phone = text.toString();
        String prephone = phone.substring(0,3);

        int partition;

        if ("136".equals(prephone)){
            partition = 0;
        }else if ("137".equals(prephone)){
            partition = 1;
        }else if ("138".equals(prephone)){
            partition = 2;
        }else if ("139".equals(prephone)){
            partition = 3;
        }else {
            partition = 4;
        }

        return partition;
    }
}

(2)在驱动函数中增加自定义数据分区设置和ReduceTask设置

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowDriver.class);

        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
		//指定自定义分区器
        job.setPartitionerClass(ProvincePartitioner.class);
        //同时指定相应数量的ReduceTask
        job.setNumReduceTasks(5);

        FileInputFormat.setInputPaths(job, new Path("F:\phone_data.txt"));
        FileOutputFormat.setOutputPath(job,new Path("F:\pp"));
        job.waitForCompletion(true);
    }
}

二、WritableComparable排序

1.默认排序

        MapTask和ReduceTask都会对数据按照key进行排序,这是Hadoop的默认行为,任何程序中的数据都会被排序,不管逻辑上是否需要。

默认排序按照字典顺序排序,且实现该排序的方法是快速排序。

2.排序概述

        MapTask:它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区的使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有的文件进行归并排序。

        ReduceTask:它从每个MapTask上远程复制相应的数据文件,如果文件大小超过一定的阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序生成一个更大的文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上,当所有数据复制完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

3.排序分类

(1)部分排序

        MapReduce根据输入记录的键对数据集排序,保证输出的每个文件内部有序。

(2)全排序

        最终输出结果只有一个文件,且文件内部有序。

(3)辅助排序

        GroupingComparator分组,在Reduce端对key进行分组。当接收key为bean对象时,想让一个或多个字段的key进入到同一个reduce方法,可以采用分组排序。

(4)二次排序

        在自定义排序过程中,如果comparaTo中的判断条件为两个即为二次排序

实例1——全排序:

(1)FlowBean对象在在需求1基础上增加了比较功能

public class FlowBean implements WritableComparable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow(){
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();

    }

    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", sumFlow=" + sumFlow +
                '}';
    }

    @Override
    public int compareTo(FlowBean o) {
        if (this.sumFlow < o.sumFlow){
            return 1;//倒序
        }else if (this.sumFlow > o.sumFlow){
            return -1;
        }else{
            return 0;
        }
    }
}

(2)编写Mapper类

public class FlowMapper extends Mapper {
    private Text outV = new Text();
    private FlowBean outK = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] split = line.split("t");

        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        outV.set(split[0]);

        context.write(outK,outV);

    }
}

(3)编写Reducer类

public class FlowReducer extends Reducer {


    @Override
    protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }
    }
}

(4)编写Driver类

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowDriver.class);

        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //job.setPartitionerClass(ProvincePartitioner.class);
        //job.setNumReduceTasks(5);

        FileInputFormat.setInputPaths(job, new Path("F:、、input\part-r-00000"));
        FileOutputFormat.setOutputPath(job,new Path("F:\cc"));
        job.waitForCompletion(true);
    }
}

实例2——区内排序

(1)增加自定义分区类

public class ProvincePartitioner extends Partitioner {

    @Override
    public int getPartition(FlowBean flowBean,Text text, int i) {
        String phone = text.toString();
        String prephone = phone.substring(0,3);

        int partition;

        if ("136".equals(prephone)){
            partition = 0;
        }else if ("137".equals(prephone)){
            partition = 1;
        }else if ("138".equals(prephone)){
            partition = 2;
        }else if ("139".equals(prephone)){
            partition = 3;
        }else {
            partition = 4;
        }

        return partition;
    }
}

(2)在驱动类中添加分区类

// 设置自定义分区器
job.setPartitionerClass(ProvincePartitioner.class);

// 设置对应的ReduceTask的个数
job.setNumReduceTasks(5);

三、Combiner合并

(1)Combiner是MR程序中Mapper和Reducer之外的一种组件

(2)Combiner组件的父类就是Reducer

(3)Combiner和Reducer的区别在于位置

Combiner是在每一个MapTask所在的节点运行

Reducer是在接受全局所有Mapper的输出结果

(4)Combiner的意思就是对每一个MapTask的输出进行局部汇总

(5)Combiner能够应用的前提是不能影响最终的业务逻辑,且Combiner的KV应该可以跟Reducer中的输入KV类型对应起来

(6)实例:

方案一:

(1)增加一个WordCountCombiner类继承Reducer

public class WordCountCombiner extends Reducer {

private IntWritable outV = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }

        //封装outKV
        outV.set(sum);

        //写出outKV
        context.write(key,outV);
    }
}

(2)在WordcountDriver驱动类中指定Combiner

// 指定需要使用combiner,以及用哪个类作为combiner的逻辑
job.setCombinerClass(WordCountCombiner.class);

方案二:

        将WordcountReducer作为Combiner在WordcountDriver驱动类中指定

// 指定需要使用Combiner,以及用哪个类作为Combiner的逻辑
job.setCombinerClass(WordCountReducer.class);

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/667801.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号