栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Hadoop案例演示电话信息分离-基于wordcount案例拓展

Hadoop案例演示电话信息分离-基于wordcount案例拓展

基于IDEA环境下代码案例 wordcount案例
  • idea新建maven项目
  • 引入依赖
  • 创建Mapper
  • 创建Reducer
  • 创建Driver
  • Ubuntu内部运行(需要实现搭建好maven环境)
引入依赖:
//在pom.xml文件中



  4.0.0

  com.edu.nefu
  mr
  1.0-SNAPSHOT

  mr
  
  http://www.example.com

  
    UTF-8
    1.7
    1.7
  

  
    
      junit
      junit
      4.11
      test
    
    
      org.apache.logging.log4j
      log4j-core
      2.8.2
    
    
      org.apache.hadoop
      hadoop-common
      2.7.2
    
    
      org.apache.hadoop
      hadoop-client
      2.7.2
    
    
      org.apache.hadoop
      hadoop-hdfs
      2.7.2
    
  

  
    
      
        
        
          maven-clean-plugin
          3.1.0
        
        
        
          maven-resources-plugin
          3.0.2
        
        
          maven-compiler-plugin
          3.8.0
        
        
          maven-surefire-plugin
          2.22.1
        
        
          maven-jar-plugin
          3.0.2
        
        
          maven-install-plugin
          2.5.2
        
        
          maven-deploy-plugin
          2.8.2
        
        
        
          maven-site-plugin
          3.7.1
        
        
          maven-project-info-reports-plugin
          3.0.0
        
        
          maven-compiler-plugin
          3.1
          
            1.8
            1.8
          
        
        
          maven-assembly-plugin
          
            
              
                jar-with-dependencies
              
            
          
          
            
              make-assembly
              package
              
                single
              
            
          
        
      

  


创建Mapper类
//包名自定义
package com.edu.nefu;
//注意引入的包必须是apache的新包
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;


import java.io.IOException;

// map阶段
// key in --索引、偏移量
// value in  --String 类型
// key out    --String 类型
// value out
public class WordcountMapper extends Mapper {

    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1.获取一行
        String line = value.toString();
        // 2.切割
        String[] words = line.split(" ");
        // 3.输出
        for (String word : words) {
            k.set(word);
            context.write(k,v);
        }

    }

}

创建Reducer类
//包名自定义
package com.edu.nefu;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordcountReducer extends Reducer {
    int sum;
    IntWritable v = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        // 1.累加求和
        sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        // 2.输出
        v.set(sum);
        context.write(key,v);
    }

}

创建Driver类
package com.edu.nefu;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1,获取配置信息以及封装任务
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2,设置jar加载路径
        job.setJarByClass(WordcountDriver.class);

        // 3,设置map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        // 4,设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5,设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6,设置输入和输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        // 7,提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0:1);
    }
}

  • 随后clean package
  • 将胖包导入Ubuntu
  • 注意一点:必须先启动
    · namenode
    · datanode
    · resourcemanager
    · nodemanager
#简单写一个wordcount文本
vim word count.txt
hdfs dis -midair -p /user/root/input
#发送到虚拟机,运行
hadoop jar ……
Hadoop的序列化案例
  1. 需求
    · 统计手机号码的耗费的总上行流量/下行流量/状态妈/ 返回总流量
    · 输入数据:phone_data.txt
  2. 需求分析
    · 一个典型的wordcount案例扩展
    · 但是使用Hadoop原生数据类型很难实现
    · 使用自定义
  3. 满足条件
    · 必须实现writeable接口
    · 反序列化时,需要反射调用空参构造函数,所以必须有空参构造:
    public FlowBean(){ super();}
  4. · 重写序列化方法;
    · 重写反序列化方法
  5. 序列化顺序和反序列化顺序必须完全一致
    · 先进先出的队列;
  6. 想要把结果显示在文件中,需要重写toString(),可用”t”分开,方便后续调用;
  7. 需要将自定义的bean放在key中传输,则还需要实现Comparable借口,MapReduce框架中的shuffle过程要求对key必须能排序。
//自定义WriteBean
package com.edu.nefu;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable{
    private long upFlow;
    private long downFlow;
    private long sumFlow;
    public FlowBean(){
        super();
    }
    public FlowBean(long upFlow,long downFlow) {
        this.upFlow=upFlow;
        this.downFlow=downFlow;
        this.sumFlow = upFlow + downFlow;
    }
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(this.upFlow);
        dataOutput.writeLong(this.downFlow);
        dataOutput.writeLong(this.sumFlow);
    }
    @Override
    public String toString(){
        return this.upFlow+":"+this.downFlow+":"+this.sumFlow;
    }
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow=dataInput.readLong();
        this.downFlow=dataInput.readLong();
        this.sumFlow=dataInput.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
    public void set(long u,long d){
        this.upFlow=u;
        this.downFlow=d;
        this.sumFlow=u+d;
    }
}

//重写wordcount Mapper
package com.edu.nefu;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper {
    public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException {
            String Line = value.toString();
            String[] fields = Line.split(" ");
            Text k = new Text();
            FlowBean v = new FlowBean();
            k.set(fields[1]);
            v.setUpFlow(Long.parseLong(fields[fields.length-3]));
            v.setDownFlow(Long.parseLong(fields[fields.length-2]));
            v.setSumFlow(Long.parseLong(fields[fields.length-3])+Long.parseLong(fields[fields.length-2]));
            context.write(k, v);
    }
}
//重写Reducer
package com.edu.nefu;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer {
    public void reduce(Text key,Iterable values,Context context) throws IOException, InterruptedException {
        long total_upFlow = 0;
        long total_downFlow = 0;
        for(FlowBean bean:values){
            total_upFlow += bean.getUpFlow();
            total_downFlow += bean.getDownFlow();
        }
        FlowBean v = new FlowBean();
        v.set(total_upFlow,total_downFlow);
        context.write(key,v);
    }
}

//重写Driver
package com.edu.nefu;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;


public class FlowBeanDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1,获取配置信息以及封装任务
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2,设置jar加载路径
        job.setJarByClass(FlowBeanDriver.class);

        // 3,设置map和reduce类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 4,设置map输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5,设置最终输出kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6,设置输入和输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        // 7,提交
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0:1);
    }
}

总结
· 这只是在wordcount的基础上重写了FlowBean的类
· 整体的Driver并没有多么大的改动
· 这是wordcount案例的拓展延伸

理论 MapReduce的切片 1. 切片与MapTask的并行度决定机制
  1. 问题引出
    · MapTask的并行度决定Map阶段的任务处理并发度,进而影响整个Jod的处理速度
    · 用多少个任务并行处理Map数据。直接影响集群的并发处理能力
  2. MapTask并行度决定机制
    · 数据块:Block是HDFS在物理上把数据分成一块一块。
    · 数据切片:数据切片只是在逻辑上对输入进行切分,并不会在磁盘上将其切分成片进行存储。
  3. 切块需要按照BlockSize来切分
  • 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
  • 每个spilt切片分配一个MapTask并行实例处理
  • 默认情况下切片大小是一个BlockSize
  • 切片时不考虑数据集整体,而是针对每个文件进行切片
2.FileinputFormat切片过程
  1. 先找到数据存储的路径
  2. 开始遍历处理(规划切片)目录下的每一个文件
  3. 开始便利第一个文件:
    · 获取文件的大小
    · 计算切片大小
    · 默认切片大小=BlockSize
    · 开始切分,每次切片时,都要判断切完剩下的部分是否大于切片大小的1.1倍,如果不大于1.1倍就不进行切分(考点、大题)
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/350018.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号