数据内容
公司名 总收入 总支出
1 apple 1520 100 2 apple 3421 254 3 apple 4500 364 1 huawei 3700 254 2 huawei 2700 354 3 huawei 5700 554 1 xiaomi 3521 254 2 xiaomi 3123 354 3 xiaomi 3412 554
目标:获得公司总利润,即总收入-总支出
1. companyDoubleMr.java
package hadoop_test.mutil_mr_10.company; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class DoubleMr implements WritableComparable2. mr1 FirstMrDriver{ private String name; private int profit; @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeInt(profit); } @Override public void readFields(DataInput in) throws IOException { this.name=in.readUTF(); this.profit=in.readInt(); } @Override public int compareTo(DoubleMr o) { return this.profit-o.profit; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getProfit() { return profit; } public void setProfit(int profit) { this.profit = profit; } @Override public String toString() { return name+" "+profit; } }
package hadoop_test.mutil_mr_10.mr1;
import hadoop_test.Utils_hadoop;
import hadoop_test.mutil_mr_10.company.DoubleMr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.awt.*;
public class FirstMrDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(FirstMrDriver.class);
job.setMapperClass(FirstMrMapper.class);
job.setReducerClass(FirstMrReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleMr.class);
job.setOutputKeyClass(DoubleMr.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("/hadoop_test/muti_mr/mutil_mr.txt"));
FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/muti_mr/result"));
job.waitForCompletion(true);
}
}
FirstMrMapper
package hadoop_test.mutil_mr_10.mr1; import hadoop_test.mutil_mr_10.company.DoubleMr; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FirstMrMapper extends MapperFirstMrReducer{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] data = line.split(" "); DoubleMr mr = new DoubleMr(); //作为keyOut mr.setName(data[1]); //计算利润作为valueOut mr.setProfit(Integer.parseInt(data[2])-Integer.parseInt(data[3])); context.write(new Text(mr.getName()), mr); } }
package hadoop_test.mutil_mr_10.mr1; import hadoop_test.mutil_mr_10.company.DoubleMr; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FirstMrReducer extends Reducer3. mr2 SecondMrDriver{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { DoubleMr tmp = new DoubleMr(); tmp.setName(key.toString()); for(DoubleMr mr:values){ tmp.setProfit(tmp.getProfit() + mr.getProfit()); } context.write(tmp, NullWritable.get()); } }
package hadoop_test.mutil_mr_10.mr2;
import hadoop_test.Utils_hadoop;
import hadoop_test.mutil_mr_10.company.DoubleMr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SecondMrDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(SecondMrDriver.class);
job.setMapperClass(SecondMrMapper.class);
job.setMapOutputKeyClass(DoubleMr.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("/hadoop_test/muti_mr/result"));
FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/muti_mr/result01"));
job.waitForCompletion(true);
}
}
依赖于mr1输出的文件结果result01,只有等到结果输出后,mr2中才能正常启动
SecondMapperpackage hadoop_test.mutil_mr_10.mr2; import hadoop_test.mutil_mr_10.company.DoubleMr; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class SecondMrMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] data = line.split(" "); DoubleMr mr = new DoubleMr(); mr.setName(data[0]); mr.setProfit(Integer.parseInt(data[1])); // 存在问题,是因为mapper端,是局部排序,要想全局排序 // context.write(1, mr); context.write(mr, NullWritable.get()); } }
输出结果
mr1输出
经过mr2后



