栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Hadoop序列化--统计每一个手机号全年的总话费

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Hadoop序列化--统计每一个手机号全年的总话费

1.数据准备

2.基本思路:
Map阶段:
a.读取一行数据,切分字段;
b.抽取手机号、套餐基本费、语音通信费、短信彩信费、流量费;
c.以手机号为key,bean对象为value输出,即context.write(手机号,bean)。
Reduce阶段:
a.累加套餐基本费、语音通信费、短信彩信费、流量费得到总花费;
b.实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输;
c. MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。
3.freebean

import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class freebean implements Writable {
    //套餐基本费
    private long baseFee;
    //语音通信费
    private long communicateFee;
    //短信彩信费
    private long msgFee;
    //流量费
    private long flowFee;
    //总费用
    private long sumFee;

    //2  反序列化时,需要反射调用空参构造函数,所以必须有空参构造
    public freebean() {

    }

    public freebean(long baseFee, long communicateFee, long msgFee, long flowFee) {
        super();
        this.baseFee = baseFee;
        this.communicateFee = communicateFee;
        this.msgFee = msgFee;
        this.flowFee = flowFee;
        this.sumFee = sumFee;
    }

    //设置参数的便利方法
    public void setFee(long baseFee, long communicateFee, long msgFee, long flowFee) {
        this.baseFee = baseFee;
        this.communicateFee = communicateFee;
        this.msgFee = msgFee;
        this.flowFee = flowFee;
        this.sumFee = baseFee + communicateFee + msgFee + flowFee;
    }

    //3  写序列化方法
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(baseFee);
        out.writeLong(communicateFee);
        out.writeLong(msgFee);
        out.writeLong(flowFee);
        out.writeLong(sumFee);
    }

    //4 反序列化方法
    //5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致
    @Override
    public void readFields(DataInput in) throws IOException {
        baseFee = in.readLong();
        communicateFee = in.readLong();
        msgFee = in.readLong();
        flowFee = in.readLong();
        sumFee = in.readLong();
    }

    // 6 编写toString方法,方便后续打印到文本
    @Override
    public String toString() {
        return baseFee + "t" + communicateFee + "t" + msgFee + "t" + flowFee + "t" + sumFee;
    }

    public long getbaseFee() {
        return baseFee;
    }

    public void setbaseFee(long baseFee) {
        this.baseFee = baseFee;
    }

    public long getCommunicateFee() {
        return communicateFee;
    }

    public void setCommunicateFee(long communicateFee) {
        this.communicateFee = communicateFee;
    }

    public long getMsgFee() {
        return msgFee;
    }

    public void setMsgFee(long msgFee) {
        this.msgFee = msgFee;
    }

    public long getFlowFee() {
        return flowFee;
    }

    public void setFlowFee(long flowFee) {
        this.flowFee = flowFee;
    }

    public long getSumFee() {
        return sumFee;
    }

    public void setSumFee(long sumFee) {
        this.sumFee = sumFee;
    }

}

4.freemap

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import java.io.IOException;

public class freemap extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       String datas[]=value.toString().split("t");
       String phoneNum=datas[1];
       freebean fb=new freebean( Long.parseLong(datas[2]),
               Long.parseLong(datas[3]),
               Long.parseLong(datas[4]),
               Long.parseLong(datas[5]));
       context.write(new Text(phoneNum),fb);
    }
}

5.freeReduce

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class freeReduce extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        long baseFee=0;
        long communicateFee=0;
        long msgFee=0;
        long flowFee=0;

        for(freebean value:values){
            baseFee+=value.getbaseFee();
            communicateFee+=value.getCommunicateFee();
            msgFee+=value.getMsgFee();
            flowFee+=value.getFlowFee();

        }
        freebean fb=new freebean(baseFee,communicateFee,msgFee,flowFee);
        context.write(key,fb);
    }
}

6.freeDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;


public class freeDrive {
    public static void main(String[] args) throws Exception{

        Job job=Job.getInstance();

        job.setJarByClass(freeDrive.class);
        job.setMapperClass(freemap.class);
        job.setReducerClass(freeReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(freebean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(freebean.class);

        Path input=new Path("F:/test_shixun/word/phoneFee.txt");
        Path output=new Path("F:/test_shixun/output3");

        FileSystem fileSystem=output.getFileSystem(new Configuration());
        if(fileSystem.exists(output)){
            fileSystem.delete(output,true);
        }

        FileInputFormat.setInputPaths(job,input);
        FileOutputFormat.setOutputPath(job,output);

        boolean flag=job.waitForCompletion(true);
        System.exit(flag?0:-1);

    }
}

7.输出结果

8.原来的数据

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/678960.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号