栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【Hadoop学习项目】3. 求平均值 + 使用combine

【Hadoop学习项目】3. 求平均值 + 使用combine

无combine 0. 项目结构

1. AvgDriver.java
package hadoop_test.avg_demo_03;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AvgDriver {

    public static void main(String[] args) throws Exception {

        System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(AvgDriver.class);
        job.setMapperClass(AvgMapper.class);
        job.setReducerClass(AvgReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        FileInputFormat.setInputPaths(job, new Path("/hadoop_test/avg/avg.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/hadoop_test/avg/result"));
        job.waitForCompletion(true);
    }
}

2. AvgMapper
package hadoop_test.avg_demo_03;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class AvgMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // class01 69
        String line = value.toString();
        // 按空格进行分割,读取第一个数据,将其作为Key,例:class01
        String outkeys=line.split(" ")[0];
        // 按空格进行分割,读取第二个数据,将其作为Value,例:69
        // 将Text转化为十进制整数
        int outvalues=Integer.parseInt(line.split(" ")[1]);

        context.write(new Text(outkeys),new IntWritable(outvalues));
    }
}

3. AvgReducer
package hadoop_test.avg_demo_03;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class AvgReducer  extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int flag=0;             // 计算考试次数
        int count=0;            // 计算考试成绩总和
        for (IntWritable value:
             values) {
            count+=value.get();
            flag+=1;
        }
        float re=count/flag;    // 求平均分

        context.write(new Text(key),new DoubleWritable(re));
    }
}

有combine 0. 项目结构

1. AvgDriver
package hadoop_test.avg_hmk_03;

import hadoop_test.Utils_hadoop;
import hadoop_test.word_count_demo_01.WordCountCombine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AvgDriver {

    public static void main(String[] args) throws Exception {

        System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(AvgDriver.class);

        job.setMapperClass(AvgMapper.class);
        job.setCombinerClass(AvgCombine.class);
        job.setReducerClass(AvgReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);         // 注意Map输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        FileInputFormat.setInputPaths(job, new Path("/hadoop_test/avg/avg.txt"));
        //FileOutputFormat.setOutputPath(job, new Path("/hadoop_test/avg/homework_result"));
        if( Utils_hadoop.testExist(conf,"/hadoop_test/avg/homework_result")){
            Utils_hadoop.rmDir(conf,"/hadoop_test/avg/homework_result");
        }
        FileOutputFormat.setOutputPath(job, new Path("/hadoop_test/avg/homework_result"));

        job.waitForCompletion(true);
    }
}

2. AvgMapper
package hadoop_test.avg_hmk_03;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class AvgMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // class01 69
        String line = value.toString();
        // 按空格进行分割,读取第一个数据,将其作为Key,例:class01
        String outkeys=line.split(" ")[0];
        // 按空格进行分割,读取第二个数据,将其作为Value,例:69
        // 将Text转化为十进制整数
        int outvalues=Integer.parseInt(line.split(" ")[1]);
        System.out.println(outkeys + ":" + outvalues);

        context.write(new Text(outkeys),new Text(String.valueOf(outvalues)));
    }
}

注意:将Map输出的Value变为Text。因为目标Combine格式为:人名 : 总成绩_考试次数 , 因为Combine使用的Context与Mapper保持一致,因此Map输出的value也需设为Text。

3. AvgCombine
package hadoop_test.avg_hmk_03;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class AvgCombine extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int num = 0;             // 计算考试次数
        int count = 0;            // 计算考试成绩总和
        for(Text value: values){
            count += Integer.parseInt(value.toString());
            num += 1;
        }
        System.out.println(key + ":" + count + "_" + num);
        // 因各个平均值累加起来再平均后与总平均值不一定相等,故以 Tom 68_4 的格式输出给Reduce
        context.write(new Text(key), new Text(String.valueOf(count) + "_" + String.valueOf(num)));
    }
}

4. AvgReducer
package hadoop_test.avg_hmk_03;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class AvgReducer  extends Reducer {
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        int num = 0;             // 计算考试次数
        int count = 0;            // 计算考试成绩总和
        for (Text value:
             values) {
            String subCou = value.toString().split("_")[0];
            String subNum = value.toString().split("_")[1];
            count += Integer.parseInt(subCou);
            num += Integer.parseInt(subNum);
        }
        double re = count / num;    // 求平均分

        context.write(new Text(key),new DoubleWritable(re));

    }
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/729281.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号