栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

大数据开发Hadoop-----MapReduce

大数据开发Hadoop-----MapReduce

1.常用得数据序列化类型
2.MapReduce编程规范
1)Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(MapTask进程)对每一个调用一次
2)Reducer阶段
(1)用户自定义的Reducer要继承自己的父类
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同k的组调用一次reduce()方法
3)Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是
封装了MapReduce程序相关运行参数的job对象

3.需求:统计单词出现的次数
(1)引入依赖

    
        
            org.apache.hadoop
            hadoop-client
            3.1.3
        
        
            junit
            junit
            4.12
        
        
            org.slf4j
            slf4j-log4j12
            1.7.30
        
    

(2)自定义Mapper类

package zjc.hadoop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper {

    Text k=new Text();
    IntWritable v=new IntWritable();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line=value.toString();
        String[] words = line.split(" ");
        for(String word:words){
            k.set(word);
            context.write(k,v);
        }
    }
}

(3)自定义Reducer类

package zjc.hadoop;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;


public class WordCountReducer extends Reducer {
    int sum;
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        sum=0;
        for (IntWritable count:values) {
            sum += count.get();
        }
        v.set(sum);
        context.write(key,v);
    }
}

(4)自定义Driver类

package zjc.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountDriver {
    public static void  main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 1 获取配置信息以及获取 job 对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
// 2 关联本 Driver 程序的 jar
        job.setJarByClass(WordCountDriver.class);
// 3 关联 Mapper 和 Reducer 的 jar
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
// 4 设置 Mapper 输出的 kv 类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交 job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

4.自定义Bean对象实现序列化接口
(1)必须实现 Writable 接口
(2)构造函数
(3)重写序列化write与反序列化readFields的方法
(4)toString方法
5.需求:统计每一个手机号耗费的总上行流量、总下行流量、总流量

(1)编写流量统计的Bean对象

package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
//1 继承 Writable 接口
public class FlowBean implements Writable {
 private long upFlow; //上行流量
 private long downFlow; //下行流量
 private long sumFlow; //总流量
 //2 提供无参构造
 public FlowBean() {
 }
 //3 提供三个参数的 getter 和 setter 方法
 public long getUpFlow() {
 return upFlow;
 }
 public void setUpFlow(long upFlow) {
 this.upFlow = upFlow;
 }
 public long getDownFlow() {
 return downFlow;
 }
 public void setDownFlow(long downFlow) {
 this.downFlow = downFlow;
 }
 public long getSumFlow() {
 return sumFlow;
}
 public void setSumFlow(long sumFlow) {
 this.sumFlow = sumFlow;
 }
 public void setSumFlow() {
 this.sumFlow = this.upFlow + this.downFlow;
 }
 //4 实现序列化和反序列化方法,注意顺序一定要保持一致
 @Override
 public void write(DataOutput dataOutput) throws IOException {
 dataOutput.writeLong(upFlow);
 dataOutput.writeLong(downFlow);
 dataOutput.writeLong(sumFlow);
 }
 @Override
 public void readFields(DataInput dataInput) throws IOException {
 this.upFlow = dataInput.readLong();
 this.downFlow = dataInput.readLong();
 this.sumFlow = dataInput.readLong();
 }
 //5 重写 ToString
 @Override
 public String toString() {
 return upFlow + "t" + downFlow + "t" + sumFlow;
 } }

(2)Mapper类

package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper 
{
 private Text outK = new Text();
 private FlowBean outV = new FlowBean();
 @Override
 protected void map(LongWritable key, Text value, Context context) 
throws IOException, InterruptedException {
 //1 获取一行数据,转成字符串
 String line = value.toString();
 //2 切割数据
 String[] split = line.split("t");
 //3 抓取我们需要的数据:手机号,上行流量,下行流量
 String phone = split[1];
 String up = split[split.length - 3];
 String down = split[split.length - 2];
 //4 封装 outK outV
 outK.set(phone);
 outV.setUpFlow(Long.parseLong(up));
 outV.setDownFlow(Long.parseLong(down));
 outV.setSumFlow();
 //5 写出 outK outV
 context.write(outK, outV);
 } }

(3)Reducer类

package com.atguigu.mapreduce.writable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer 
{
 private FlowBean outV = new FlowBean();
 @Override
 protected void reduce(Text key, Iterable values, Context 
context) throws IOException, InterruptedException {
 long totalUp = 0;
 long totalDown = 0;
 //1 遍历 values,将其中的上行流量,下行流量分别累加
 for (FlowBean flowBean : values) {
 totalUp += flowBean.getUpFlow();
 totalDown += flowBean.getDownFlow();
 }
 //2 封装 outKV
 outV.setUpFlow(totalUp);
 outV.setDownFlow(totalDown);
 outV.setSumFlow();
 //3 写出 outK outV
 context.write(key,outV);
 } }

(4)驱动类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowDriver {
 public static void main(String[] args) throws IOException, 
ClassNotFoundException, InterruptedException {
 //1 获取 job 对象
 Configuration conf = new Configuration();
 Job job = Job.getInstance(conf);
 //2 关联本 Driver 类
  job.setJarByClass(FlowDriver.class);
 //3 关联 Mapper 和 Reducer
 job.setMapperClass(FlowMapper.class);
 job.setReducerClass(FlowReducer.class);
 
//4 设置 Map 端输出 KV 类型
 job.setMapOutputKeyClass(Text.class);
 job.setMapOutputValueClass(FlowBean.class);
 
//5 设置程序最终输出的 KV 类型
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(FlowBean.class);
 
//6 设置程序的输入输出路径
 FileInputFormat.setInputPaths(job, new Path("D:\inputflow"));
 FileOutputFormat.setOutputPath(job, new Path("D:\flowoutput"));
 
//7 提交 Job
 boolean b = job.waitForCompletion(true);
 System.exit(b ? 0 : 1);
 } }

6.FileInputFormat 常见的接口实现类
FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、
NLineInputFormat、CombineTextInputFormat 和自定义 InputFormat 等。
(1)TextInputFormat
TextInputFormat 是默认的 FileInputFormat 实现类。按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量, LongWritable 类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text 类型。
(2)CombineTextInputFormat
生成切片过程包括:虚拟存储过程和切片过程二部分。
1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不
大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,
那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值 2 倍,此时
将文件均分成 2 个虚拟存储块(防止出现太小切片)。
例如 setMaxInputSplitSize 值为 4M,输入文件大小为 8.02M,则先逻辑上分成一个
4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储
文件,所以将剩余的 4.02M 文件切分成(2.01M 和 2.01M)两个文件。
2)切片过程:
(a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独
形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
(c)测试举例:有 4 个小文件大小分别为 1.7M、5.1M、3.4M 以及 6.8M 这四个小文件,则虚拟存储之后形成 6 个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M)最终会形成 3 个切片,大小分别为: (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
应用:

  // 如果不设置 InputFormat,它默认用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置 4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

7.MapReduce工作流程

8.分区案例实操:


(1)增加分区类

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class ProvincePartitioner extends Partitioner {
 @Override
 public int getPartition(Text text, FlowBean flowBean, int numPartitions) 
{
 //获取手机号前三位 prePhone
 String phone = text.toString();
 String prePhone = phone.substring(0, 3);
 //定义一个分区号变量 partition,根据 prePhone 设置分区号
 int partition;
 if("136".equals(prePhone)){
 partition = 0;
 }else if("137".equals(prePhone)){
 partition = 1;
 }else if("138".equals(prePhone)){
 partition = 2;
 }else if("139".equals(prePhone)){
 partition = 3;
 }else {
 partition = 4;
 }
 //最后返回分区号 partition
 return partition;
 } }

(2)Driver驱动类加上

//8 指定自定义分区器
 job.setPartitionerClass(ProvincePartitioner.class);
 //9 同时指定相应数量的 ReduceTask
 job.setNumReduceTasks(5);

9.Combiner合并
Combiner是在每一个MapTask所在的节点运行;
Reducer是接受全局所有Mapper的输出结果。
Combiner的意义就是对每一个MapTask的输出进行局部汇总,以减小网络传输量。
案例实操:
可以自定义合并类:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountCombiner extends Reducer {
private IntWritable outV = new IntWritable();
 @Override
 protected void reduce(Text key, Iterable values, Context 
context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable value : values) {
 sum += value.get();
 }
 //封装 outKV
 outV.set(sum);
 //写出 outKV
 context.write(key,outV);
 } }

wordcountDriver驱动类中指定combiner

// 指定需要使用 combiner,以及用哪个类作为 combiner 的逻辑
job.setCombinerClass(WordCountCombiner.class);

10.OutputFormat接口实现类
自定义outputformat
应用场景:输出数据到MySQL/Hbase/Elasticsearch
案例:
11.

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/308104.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号