栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop三大组件之MapReduce

Hadoop三大组件之MapReduce

初试Hadoop之MapReduce

文章目录

初试Hadoop之MapReduce

一、MapReduce的定义二、MR的优缺点三、认识与识别MR

3.1常用数据序列化类型3.2官方WordCount源码解析3.3手写MR的WordCount案例

一、MapReduce的定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并行运行在一个Hadoop集群上。

二、MR的优缺点

优点:

它是框架,易于编写

良好的扩展性(通过对集群的机器的扩展,从而增加计算能力)

高容错性(MR如果执行失败,它会重新分配任务)

适合做海量数据的离线处理

缺点:

不擅长实时计算,无法像MySql一样,在毫秒或者秒级内返回结果不擅长流式计算,因为输入的数据集时静态的不擅长DAG(有向无环图)计算,使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下 三、认识与识别MR 3.1常用数据序列化类型

老规矩,先和我一起认识认识MR里面的数据类型吧,与Java做对比吧!

Java类型Hadoop Writable类型
BooleanBooleanWritable
ByteByteWritable
IntIntWritable
FloatFloatWritable
LongLongWritable
DoubleDoubleWritable
StringText
MapMapWritable
ArrayArrayWritable
NullNullWritable

有细心的小伙伴可能发现了,这个类型,其实没那么难记,除了String类型外,就只剩下在后面加上一个Writable就可了。

3.2官方WordCount源码解析

看源码之前可以尝试着带着几个问题去看:

一个最基本的MR的程序是如何构成的?程序的运行流程是怎样的?

官方源码解析如下

package org.apache.hadoop.examples;

import java.io.IOException;
import java.io.PrintStream;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount
{
   
  public static class TokenizerMapper extends Mapper
  {
    //当每一个新的单词出现后,就置成1,并且再将其作为一个键值对的形式,因此可以作为常量值为1
    private static final IntWritable one = new IntWritable(1);//valueout
    private Text word = new Text();//keyout

    //重写map方法,读取初试划分的每一个键值对,即行偏移量和一行字符串,key为偏移量,value为该行字符串
    public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException
    {
       //StringTokenizer时Java中用于字符串的
       
      //当我们读取的时候,每一个value其实相当于一行数据,这里使用StringTokenizer进行分割
      StringTokenizer itr = new StringTokenizer(value.toString());
      //遍历我们分割的数据,读取每一个单词
      while (itr.hasMoreTokens()) {
        //获取每一个对呀的key值
        this.word.set(itr.nextToken());
        //one代表1,最开始每个单词都是1次,context直接将写到本地磁盘上
        //write函数直接将两个参数封装成并提交
        context.write(this.word, one);
      }
    }
  }
    
  
  public static class IntSumReducer extends Reducer
  {
    //result就是对结果的统计,统计输出次数
    private IntWritable result = new IntWritable();

    //重写reduce函数,key为单词,values是reducer从多个mapper中得到数据后进行排序并将相同key组
    //这里我们只需要关注的是map和reduce函数处理后的结果组成的结果就可。
    public void reduce(Text key, Iterable values, Reducer.Context context)
      throws IOException, InterruptedException
    {
      //初始化一个sum值作为累加器
      int sum = 0;
        
      for (IntWritable val : values) {
        //累加
        sum += val.get();
      }
      //设置输出的value值
      this.result.set(sum);
      //将reduce的结果提交
      context.write(key, this.result);
    }
  }
    
  
  public static void main(String[] args)
    throws Exception
  {
    //声明配置对象
    Configuration conf = new Configuration();
    //GenericOptionsParser是hadoop框架中解析命令行参数的基本类。它能够辨别一些标准的命令行参数,能够使应用程序轻易地指定namenode,jobtracker,以及其他额外的配置资源
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    //如果你命令行的参数小于2的话就会出错,输入地址和输出地址至于为啥不设置成!=2看后面就知道了    
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount  [...] ");
      System.exit(2);
    }
    //声明job对象
    Job job = Job.getInstance(conf, "word count");
    //声明当前job的驱动类
    job.setJarByClass(WordCount.class);
    //设置mapper类、Combiner类和Reducer类
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    //设置最终输出结果的key类型和value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    //找到文件的输入路径
    for (int i = 0; i < otherArgs.length - 1; i++) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    //从输入的参数找到输出的路径
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
    //结束程序
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}
3.3手写MR的WordCount案例

3.3.1需求说明

在给定的文本文件中统计输出每一个单词出现的总次数

(1)输入数据hello.txt

atguigu atguigu
ss ss
cls cls
jiao
banzhang
xue
hadoop
sgg sgg sgg
nihao nihao
bigdata0111
laiba               

(2)期望输出数据

atguigu  2
banzhang 1
cls  2
hadoop  1
jiao 1
ss   2
xue 1

3.3.2 在IDEA创建项目并完成一些配置信息

​ 创建Maven工程不会IDEA中创建Maven项目的可以参考我之前的博客Maven的安装以及在IDEA中的使用 ,

​ (1) 在pom.xml中添加一下配置信息


    
        junit
        junit
        4.12
    
    
        org.apache.logging.log4j
        log4j-slf4j-impl
        2.12.0
    
    
        org.apache.hadoop
        hadoop-client
        3.1.3
    

​ (2)在项目的src/main/resources目录下,新建一个文件,命名为“log4j2.xml”,在文件中填入。

这个文件可以简单的理解成打印错误日志的文件。不配置不影响你的接下来操作。



    
        
        
            
            
        

    

    
        
        
            
        

        
        
            
        
    


3.3.3模拟实操MR

​ (1)编写mapper类

package com.mr_test.wordcount_hello;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class helloMapper extends Mapper {
    Text outk = new Text();
    IntWritable outv = new IntWritable(1);

    
    @Override
    protected void map(LongWritable key ,Text value ,Mapper.Context context) throws IOException, InterruptedException {
        //获取当前输入的数据
        String line = value.toString();
        //切割数据
        String[] splits = line.split(" ");
        for (String split: splits) {
            outk.set(split);
            //遍历集合 封装 输出数据的key 和 value
            context.write(outk,outv);
        }
    }
}

​ (2)编写Reducer类

package com.mr_test.wordcount_hello;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;


public class helloReducer extends Reducer {
    //为啥不在这里定义sum,因为这里算全局变量
//    private int sum = 0;
    IntWritable outv = new IntWritable();

    
    @Override
    protected void reduce(Text key ,Iterable values ,Reducer.Context context) throws IOException, InterruptedException {
        //每次一个key值就会重置
        int sum=0;
        //获取每一个key出现的次数
        for (IntWritable value: values) {
            sum += value.get();
        }
        //输出
        outv.set(sum);
        context.write(key,outv);
    }
}

​ (3)编写Driver驱动类

package com.mr_test.wordcount_hello;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class helloDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //获取配置信息
        Configuration conf = new Configuration();
        //获取job对象
        Job job = Job.getInstance(conf);

        //关联Driver程序相关的jar
        job.setJarByClass(helloDriver.class);

        //关联Mapper相关的jar
        job.setMapperClass(helloMapper.class);
        //关联Reducer相关的jar
        job.setReducerClass(helloReducer.class);
        
        //设置Mapper输出的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置最终输出的KV类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输入和输出路径
        FileInputFormat.setInputPaths(job,new Path("D:\StudyFile\BigDate\02.大数据技术之Hadoop\03.代码\day04\MapReduce\src\main\java\com\atguigu\mr\wordcount\com.bigData.mapreduce\src\main\resources\wcinput\hello.txt"));
        FileOutputFormat.setOutputPath(job,new Path("D:\StudyFile\BigDate\02.大数据技术之Hadoop\03.代码\day04\MapReduce\src\main\java\com\atguigu\mr\wordcount\com.bigData.mapreduce\src\main\java\testData\wcinput2"));
        
        //提交job
        job.waitForCompletion(true);
    }
}

(4) 运行结果截图

最近忙着摸鱼哈哈,没咋写博客了,但是这两天应该会再次更新MapReduce的后续内容和Yarn的。一起加油呀!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/760835.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号