栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce统计流量案例(自定义排序-全排序)基本实现

MapReduce统计流量案例(自定义排序-全排序)基本实现

输入数据

手机号  总上行流量  总下行流量  总流量

11136230513	500	500	1000
12959002129	1938	180	2118
19943685818	3659	3538	7197
22246544121	250	250	500
22256435636	250	250	500
33366251146	240	0	240
33371575951	1527	2106	3633
33388413456	4116	1432	5548
Maven必须配置

参考 MapReduce统计流量案例 的Maven配置  

resources目录下log4j.properties 配置

参考 MapReduce统计流量案例 的log4j.properties配置  

自定义Writable类实现(FlowBean)
package com.test.mapreduce.comparable;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable {

    private Long upFlow;   // 上行流量
    private Long downFlow; // 下行流量
    private Long sumFlow;  // 总流量

    public Long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }

    public Long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    
    public FlowBean() {
    }

    
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        // 序列化Value数据
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);

    }

    
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        // 反序列化数据
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    
    @Override
    public String toString() {
        return upFlow + "t" + downFlow + "t" + sumFlow;
    }

    
    @Override
    public int compareTo(FlowBean o) {
        // 按照总流量比较,倒序排序(倒序-1, 正序1)
        if (this.sumFlow > o.sumFlow){
            return -1;
        }else if(this.sumFlow < o.sumFlow){
            return 1;
        }else {
            return 0;
        }

    }
}
自定义Mapper类实现(FlowMapper)
package com.test.mapreduce.comparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper {

    // 定义一个FlowBean对象,用于封装key
    private FlowBean k = new FlowBean();
    // 定义一个Text对象,用于封装value
    private Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1.获取每一行数据
        String line = value.toString();
        // 2.切割
        String[] dataList = line.split("t");
        // 3.封装数据
        k.setUpFlow(Long.parseLong(dataList[1]));
        k.setDownFlow(Long.parseLong(dataList[2]));
        k.setSumFlow();
        v.set(dataList[0]);
        // 4.输出
        context.write(k, v);
    }
}
自定义Reducer类实现(FlowReducer)
package com.test.mapreduce.comparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer {

    @Override
    protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
        // 循环写出,避免总流量相同情况
        for (Text value : values) {
            context.write(value, key);
        }
    }
}
自定义Reducer类实现(FlowDriver)
package com.test.mapreduce.comparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.创建配置信息Configuration对象并获取Job单例对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2.设置关联本Driver程序的jar
        job.setJarByClass(FlowDriver.class);

        // 3.设置关联Mapper和Reducer的jar
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 4.设置Mapper输出的kv类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        // 5. 设置最终输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 6.设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\input"));
        FileOutputFormat.setOutputPath(job, new Path("D:\output"));

        // 7.提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}
输出数据
19943685818	3659	3538	7197
33388413456	4116	1432	5548
33371575951	1527	2106	3633
12959002129	1938	180	2118
11136230513	500	500	1000
22256435636	250	250	500
22246544121	250	250	500
33366251146	240	0	240

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/687477.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号