package hadoop_test.avg_demo_03;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(AvgDriver.class);
job.setMapperClass(AvgMapper.class);
job.setReducerClass(AvgReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.setInputPaths(job, new Path("/hadoop_test/avg/avg.txt"));
FileOutputFormat.setOutputPath(job, new Path("/hadoop_test/avg/result"));
job.waitForCompletion(true);
}
}
2. AvgMapper
package hadoop_test.avg_demo_03; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class AvgMapper extends Mapper3. AvgReducer{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // class01 69 String line = value.toString(); // 按空格进行分割,读取第一个数据,将其作为Key,例:class01 String outkeys=line.split(" ")[0]; // 按空格进行分割,读取第二个数据,将其作为Value,例:69 // 将Text转化为十进制整数 int outvalues=Integer.parseInt(line.split(" ")[1]); context.write(new Text(outkeys),new IntWritable(outvalues)); } }
package hadoop_test.avg_demo_03; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class AvgReducer extends Reducer有combine 0. 项目结构 1. AvgDriver{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int flag=0; // 计算考试次数 int count=0; // 计算考试成绩总和 for (IntWritable value: values) { count+=value.get(); flag+=1; } float re=count/flag; // 求平均分 context.write(new Text(key),new DoubleWritable(re)); } }
package hadoop_test.avg_hmk_03;
import hadoop_test.Utils_hadoop;
import hadoop_test.word_count_demo_01.WordCountCombine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class AvgDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(AvgDriver.class);
job.setMapperClass(AvgMapper.class);
job.setCombinerClass(AvgCombine.class);
job.setReducerClass(AvgReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class); // 注意Map输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
FileInputFormat.setInputPaths(job, new Path("/hadoop_test/avg/avg.txt"));
//FileOutputFormat.setOutputPath(job, new Path("/hadoop_test/avg/homework_result"));
if( Utils_hadoop.testExist(conf,"/hadoop_test/avg/homework_result")){
Utils_hadoop.rmDir(conf,"/hadoop_test/avg/homework_result");
}
FileOutputFormat.setOutputPath(job, new Path("/hadoop_test/avg/homework_result"));
job.waitForCompletion(true);
}
}
2. AvgMapper
package hadoop_test.avg_hmk_03; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class AvgMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // class01 69 String line = value.toString(); // 按空格进行分割,读取第一个数据,将其作为Key,例:class01 String outkeys=line.split(" ")[0]; // 按空格进行分割,读取第二个数据,将其作为Value,例:69 // 将Text转化为十进制整数 int outvalues=Integer.parseInt(line.split(" ")[1]); System.out.println(outkeys + ":" + outvalues); context.write(new Text(outkeys),new Text(String.valueOf(outvalues))); } }
注意:将Map输出的Value变为Text。因为目标Combine格式为:人名 : 总成绩_考试次数 , 因为Combine使用的Context与Mapper保持一致,因此Map输出的value也需设为Text。
3. AvgCombinepackage hadoop_test.avg_hmk_03; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class AvgCombine extends Reducer4. AvgReducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int num = 0; // 计算考试次数 int count = 0; // 计算考试成绩总和 for(Text value: values){ count += Integer.parseInt(value.toString()); num += 1; } System.out.println(key + ":" + count + "_" + num); // 因各个平均值累加起来再平均后与总平均值不一定相等,故以 Tom 68_4 的格式输出给Reduce context.write(new Text(key), new Text(String.valueOf(count) + "_" + String.valueOf(num))); } }
package hadoop_test.avg_hmk_03; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class AvgReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int num = 0; // 计算考试次数 int count = 0; // 计算考试成绩总和 for (Text value: values) { String subCou = value.toString().split("_")[0]; String subNum = value.toString().split("_")[1]; count += Integer.parseInt(subCou); num += Integer.parseInt(subNum); } double re = count / num; // 求平均分 context.write(new Text(key),new DoubleWritable(re)); } }



