栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

分布式计算框架MapReduce

分布式计算框架MapReduce

一.介绍

产生背景: web2.0时代到来,大量的数据需要处理,单机处理已经不能满足需求,分布式存储与计算进入历史舞台
是什么:是一个面向大数据处理的计算框架
基本特点:提供可靠的分布式计算,封装了细节开发流程简单,跨语言

二.运行流程


premap阶段

首先根据输入的文件,进行切片,一个block块就是一片
其中的每一片都会分配给一个map任务
一个计算节点可以并行执行多个map任务
分片完成后要按照回车符号进行kv格式化,k是字节偏移量,v是内容

map阶段

数据处理 Mapper
比如说将分片的数据的每行按照逗号分割
一个分片会产生一个map任务
一个map任务中会调用多次map方法(多少行就调用一次,map方法是按照行来进行处理的)
分区parttion
默认系统分区的代码如下:
![image.png](https://img-blog.csdnimg.cn/img_convert/d42b353afc88d04e54456fdadaa1a87f.png#clientId=u0122ff83-c9ff-4&from=paste&height=154&id=ufad7a530&margin=[object Object]&name=image.png&originHeight=308&originWidth=732&originalType=binary&ratio=1&size=102530&status=done&style=none&taskId=uc310c768-a5d8-4c57-8c6e-8d1ac9019a6&width=366)
map处理后的key的hash值与上int的最大值,然后模上reduce节点的数量
分区的目的:将kv对儿均匀的发送到reduce节点上,使得reduce节点负载均衡
排序 sort
分区完成后进行排序,按照key进行排序
本地合并 combiner(可选,一般计算平均值的时候不进行这部操作)
排序完成后,按照key相同的进行合并,value合并成一个可迭代的集合
一般在计算平均值的时候不进行这步操作,因为combiner实际上也是调用的reduce类的方法,也会进行局部的求平均,最后结果会变成将多个局部平均再求平均,结果就错了
将结果输出到本地

shuffle阶段

将各个节点上相同分区的数据进行合并然后排序

reduce阶段

该阶段会将key相同的进行合并,这个和combiner一样
合并完了执行reduce方法
每个reduce都会在产生对应的结果文件(hdfs文件)

三.案例 1.单词统计 需求分析

在 hdfs 目录/tmp/tianliangedu/input/wordcount 中有一系列文件,内容均为",“号分隔,
求按”,"号分隔的各个元素的出现频率,输出到目录/tmp/tianliangedu/output/个人用户名的 hdfs
目录中。

map
package com.fjh.mapper;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.stream.Stream;


public class Mapper01 extends Mapper {
    //为了节省空间,这里将每个第一次分割开的单词频率记为1
    private final static IntWritable one =new IntWritable(1);
    //暂时存储每个分割开的单词,节省空间
    private Text word  = new Text();

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        //将单词按照逗号分割开来
        String[] splitResult = value.toString().split(",");
        //循环将每个单词存储进context对象中
        for(String e:splitResult){
            word.set(e);
            context.write(word,one);
        }
    }
}

reduce
package com.fjh.reduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class Reduce01 extends Reducer {
    private IntWritable result = new IntWritable();


    //将map传来的结果进行合并
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        //统计每个单词出现的频率
        for (IntWritable val : values) {
            //将 key 组中的每个词频数值 sum 到一起
            sum += val.get();
        }
        result.set(sum);
        //结果逐条输出
        context.write(key,result);

    }
}

2.求和 需求分析

在 hdfs 目录/tmp/tianliangedu/input/wordcount 目录中有一系列文件,内容为","号分
号分
隔,分隔后的元素均为数值类型、字母、中文,求所有出现的数值的和。

map
package com.fjh.mapper;

import com.fjh.util.RegexUtil;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class Mapper02 extends Mapper {
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split(",");
        for(String e:split){
            if(RegexUtil.matchNum(e)){
            context.write(new Text("求和结果:"),new LongWritable(Long.parseLong(e)));
            }
        }
    }
}
reduce
package com.fjh.reduce;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class Reduce02 extends Reducer {
    private IntWritable result = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        //统计每个单词出现的频率
        for (LongWritable val : values) {
            //将 key 组中的每个词频数值 sum 到一起
            sum += val.get();
        }
        result.set(sum);
        //结果逐条输出
        context.write(key,result);
    }
}

3.driver类
package com.fjh;


import com.fjh.mapper.*;
import com.fjh.reduce.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class Main {
    //获取配置
    public static final Configuration conf = new Configuration();
    
    public static void main(String[] args) throws Exception {

        //任务名称
        String jobName = "付君华-作业"+args[0];
        //根据同的参数创建不同的任务
        switch (args[0]){
            case "1":
                System.out.println("作业1--------->");
                workDriverSchedule(jobName, Reduce01.class,Mapper01.class,Reduce01.class, Text.class, IntWritable.class,args[1],args[2]);
                break;
            case "2":
                System.out.println("作业2--------->");
                workDriverSchedule(jobName, null, Mapper02.class,Reduce02.class, Text.class, LongWritable.class,args[1],args[2]);
                break;
            case "3":
                System.out.println("作业3--------->");
                workDriverSchedule(jobName, null, Mapper03.class, Reduce03.class, Text.class, LongWritable.class,args[1],args[2]);
                break;
            case "4":
                System.out.println("作业4--------->");
                workDriverSchedule(jobName, Reduce04.class, Mapper04.class, Reduce04.class, Text.class, IntWritable.class,args[1],args[2]);
                break;
            case "5":
                System.out.println("作业5--------->");
                workDriverSchedule(jobName, null, Mapper05.class, Reduce05.class, Text.class, IntWritable.class,args[1],args[2]);
                break;
            case "6":
                System.out.println("作业6--------->");
                workDriverSchedule(jobName, null, Mapper06.class,Reduce06.class, Text.class, IntWritable.class,args[1],args[2]);
                break;
            case "7":
                System.out.println("作业7--------->");
                workDriverSchedule(jobName, null, Mapper06.class,Reduce06.class, Text.class, IntWritable.class,args[1],args[2]);
                break;
            case "8":
                System.out.println("作业8--------->");
                break;
            default:
                System.out.println("第一个参数错误,范围1-8");
                break;
        }
    }

    public static void workDriverSchedule(String jobName,Class combinerClass,Class mapClass,Class reduceClass,Class jobOutputKeyClass,Class jobOutputValueClass,String inputFilePath,String outputFilePath) throws Exception {
        //创建任务
        Job job =Job.getInstance(conf,jobName);
        //指定执行主类
        job.setJarByClass(Main.class);
        //指定map类
        job.setMapperClass(mapClass);
        //如果有combiner类就指定combiner类
        if(combinerClass!=null){
            job.setCombinerClass(combinerClass);
        }
        //指定reduce类
        job.setReducerClass(reduceClass);
        //指定输出结果的key的类型
        job.setOutputKeyClass(jobOutputKeyClass);
        //指定输出结果的value的类型
        job.setOutputValueClass(jobOutputValueClass);
        //指定输入文件路径
        //修改成多输入的(多个输入用逗号分割)
        String[] input = inputFilePath.split(",");
        for(String filePath:input){
            FileInputFormat.addInputPath(job,new Path(filePath));
        }
        //指定输出结果文件路径
        FileOutputFormat.setOutputPath(job,new Path(outputFilePath));
        //指定job执行模式(等待任务执行完成后,提交任务的客户端才会退出!)
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/303776.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号