栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Hadoop】——MapReduce:概述&案例

【Hadoop】——MapReduce:概述&案例

一、MapReduce概述 1. 定义

MapReduce是一个分布式运算程序的编程框架,使用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

2. 优势 <1>易于编程

用户只关心业务逻辑。实现框架的接口。

<2>良好的扩展性

可以动态增加服务器,解决计算资源不够的问题。

<3>高容错性

任何一个节点宕机,可以将任务转移到其他节点

<4>适合海量数据计算

TB/PB
几千台服务器共同计算

3. 劣势 <1>不适合实时计算 <2>不适合流式计算

sparkstreaming ,flink擅长

<3>不适合DAG有向无环图计算

spark适合

4. MapReduce核心思想

<1>分为Map阶段和Reduce阶段
<2>Map阶段并发MapTask,完全并行运行,互不干扰
<3>Reduce阶段的并发ReduceTask,完全并行运行,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
<4>MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那么只能多个MapReduce程序,串行运行

5. MapReduce进程 <1>MrAppMaster

负责整个程序的过程调度以及状态协调

<2>MapTask

负责Map阶段的整个数据处理流程

<3>ReduceTask

负责Reduce阶段的整个数据处理流程

6. WordCount案例 <1>Map类

<2>Reduce类

<3>Main方法

7. 常用数据序列化类型

8. MapReduce编程规范

用户编写的程序分为三部分:Mapper、Reducer 和Driver

<1> Mapper
    用户自定义的Mapper要继承自己的父类Mapper的输入数据是kv对的形式Mapper中的业务逻辑写在map()方法中Mapper的输出数据是kv对的形式map()方法(MapTask进程)对每一个调用一次
<2> Reducer
    用户自定义的Reducer要继承自己的父类Reducer的输入数据类型对应Mapper的输出数据类型,也是KV对Reducer的业务逻辑写在reduce()方法中ReduceTask进程对每一组相同k的组调用一次reduce()方法

<3>Driver

相当于YARN集群的客户端,用于提交我们整个程序到YRAN集群,提交的是封装了MapReduce程序相关运行参数的job对象。

    获取配置信息,获取job对象实例指定本程序jar包所在的本地路径关联Mapper/Reducer业务类指定Mapper输出数据的kv类型指定最终输出的数据的kv类型指定job的输入原始文件所在目录指定job的输出结果所在目录(不能提前存在)提交作业
9. WordCount案例实操 1. 需求

<1>在给定的文本文件中统计输出每个单词出现的总次数
<2>输入数据: hello.txt

<3>期望输出数据: 统计结果

2. 过程分析 <1>Mapper
    将MapTask传给我们的文本先转换为String根据空格将这一行切分成单词将单词输出为<单词,1>的形式
<2>Reducer
    汇总各个key的个数输出该key的总次数
<3>Driver
    获取配置信息,获取job对象实例指定本程序jar包所在的本地路径关联Mapper/Reducer业务类指定Mapper输出数据的kv类型指定最终输出的数据的kv类型指定job的输入原始文件所在目录指定job的输出结果所在目录(不能提前存在)提交作业
3. 环境准备 <1> 新建项目

<2> Maven 坐标
    
        
            org.apache.hadoop
            hadoop-client
            3.1.3
        

        
            junit
            junit
            4.12
        
        
            org.slf4j
            slf4j-log4j12
            1.7.30
        
    

<3>Log4J配置文件
log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
<4>创建包路径
com.demo.mapreduce.wordcount
4. 编写程序 <1>WordCountMapper
package com.demo.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;


public class WordCountMapper extends Mapper {
    private Text outK = new Text();
    private IntWritable outV = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
        //1. 获取一行数据
        String line = value.toString();
        //2. 对数据进行切割
        String[] words = line.split(" ");
        //3. 循环写出
        for (String word : words) {
            outK.set(word);
            context.write(outK, outV);
        }
    }
}

<2>WordCountReducer
package com.demo.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class WordCountReducer extends Reducer {
    private IntWritable outV = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
        //1. 累加
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        //2. 写出
        outV.set(sum);
        context.write(key, outV);
    }
}

<3>WordCountDriver
package com.demo.mapreduce.wordcount;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class WordCountDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //1. 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //2. 设置jar包路径
        job.setJarByClass(WordCountDriver.class);
        //3. 关联mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //4. 设置map的输出的key 和 value 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 5. 设置最终输出的key 和 value 的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 6. 指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path("E:\javaWordspaces\MapReduceDemohello.txt"));
        //7.指定job的输出结果所在目录(不能提前存在)
        FileOutputFormat.setOutputPath(job, new Path("E:\javaWordspaces\MapReduceDemo\result"));
        // 8.提交作业
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }

}

5. 集群运行 <1>maven打包插件

    
        
            maven-compiler-plugin
            3.6.1
            
                1.8
                1.8
            
        
        
            maven-assembly-plugin
            
                
                    jar-with-dependencies
                
            
            
                
                    make-assembly
                    package
                    
                        single
                    
                
            
        
    


<2>修改WordCountDriver

<3>mvn clean install <4>将jar包上传到linux服务器

<5>准备数据

HDFS 下创建 /wcinput/word.txt

<6>执行命令
hadoop jar MapReduceDemo-1.0-SNAPSHOT.jar com.demo.mapreduce.wordcount.WordCountDriver /wcinput/word.txt /wcoutput
<7>结果


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711716.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号