栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce课设

MapReduce课设

书写bean类
package ks;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class mybean implements Writable{
	//定义变量
    private long upFlow;  //上行数据包数
    private long downFlow;  //下行数据包数
    private long sumFlow;
    
    //空参构造
    public mybean(){
        super();
    }
    
    //有参构造
    public mybean(long upFlow, long downFlow){
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        sumFlow = upFlow + downFlow;
    }

	//定义set方法,可有可无
    public void set(long upFlow,long downFlow){
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        sumFlow = upFlow + downFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getUpFlow() {
        return upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public  void  setSumFlow(long sumFlow){
        this.sumFlow = sumFlow;
    }

    public  long getSumFlow(){
        return sumFlow;
    }

	重写tostring方法
    public String toString() {
        return upFlow +
                "t" + downFlow +
                "t" + upFlow + downFlow;
    }

    //序列化方法
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

	//反序列化方法,顺序必须和序列化方法一致
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }
}

#重写map方法

package ks;

import mreduce.mybean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
//第一个参数表示接收key的数据类型(一般都为LongWritable )
//第二个参数表示接受的value的数据类型(一般都为Text)
//第三个参数表示写出的key的数据类型
//第四个参数表示写出的value的数据类型		
public class mymapper extends Mapper{
        Text k = new Text();
        mybean v = new mybean();
        @Override
        protected void map(LongWritable key, Text value,Context context)
                throws IOException, InterruptedException {
            //获取一行
            String line = value.toString();
            //对获取到的文本进行切分
            String[] splited = line.split("t");
			//设置key值
            k.set(splited[1]);
            long upFlow = Long.parseLong(splited[splited.length-3]);
            long downFlow = Long.parseLong(splited[splited.length-2]);
			//设置value值
            v.set(upFlow,downFlow);
			//写出数据
            context.write(k,v);
    }
}
重写reduce方法
package ks;
import mreduce.mybean;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
//第一个参数表示接收key的数据类型(一般都为LongWritable )
//第二个参数表示接受的value的数据类型(一般都为Text)
//第三个参数表示写出的key的数据类型
//第四个参数表示写出的value的数据类型		
public class myReducer extends Reducer {
	//定义一个bean对象,用于存写出的value
    mybean v = new mybean();
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        long sumUpFlow = 0;
        long sumDownFlow = 0;
        //进行统计
        for (mybean flow: values) {
            sumUpFlow += flow.getUpFlow();
            sumDownFlow += flow.getDownFlow();
        }
		//调用set方法,初始化v对象
        v.set(sumUpFlow,sumDownFlow);
		//写出
        context.write(key,v);
    }
}

分区
import mreduce.mybean;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.io.Text;
//第一个参数表示接收key的数据类型
//第二个参数表示接受的value的数据类型
public class mypartition extends Partitioner {
    @Override
    public int getPartition(Text key, mybean value, int numPartitions) {
		//获取手机号的前三位数字
        String pre =  key.toString().substring(0,3);
		//默认分区为4号分区
        int partition = 4;
        if("136".equals(pre)){
        	//手机号前三位为136在0号分区
            partition = 0;
        }else if("137".equals(pre)){
        	//手机号前三位为137在1号分区
            partition = 1;
        }else if("138".equals(pre)){
        	//手机号前三位为138在2号分区
            partition = 2;
        }else if("139".equals(pre)){
        //手机号前三位为139在3号分区
            partition = 3;
        }
        //返回分区
        return partition;
    }
}
主类
package ks;

import mreduce.mybean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.IOException;

public class mydriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        conf.set("mapred.jar", "D:\Hadoop\untitled1\untitled\target\untitled1-1.0-SNAPSHOT.jar");
        Job job = Job.getInstance(conf);
		//进行打包
        job.setJarByClass(mydriver.class);
		//设置数据源路径
        FileInputFormat.setInputPaths(job, new Path("file:///d:\value.txt"));
		//绑定map类
        job.setMapperClass(mymapper.class);

        //shuffle过程
        job.setPartitionerClass(mypartition.class);
        job.setNumReduceTasks(5);
		//设置map的输出数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(mybean.class);
		//绑定reduce类
        job.setReducerClass(myReducer.class);
		//设置reduce的输出数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(mybean.class);
		//设置文件保存路径
        FileOutputFormat.setOutputPath(job, new Path("file:///d:\output1"));

        //交给yarn去执行,直到执行结束才退出本程序
        job.waitForCompletion(true);

        BasicConfigurator.configure();
    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/663135.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号