栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【hadoop学习项目】9. 存在依赖关系的多个MapReduce处理

【hadoop学习项目】9. 存在依赖关系的多个MapReduce处理

0. 项目结构


数据内容
公司名 总收入 总支出

1 apple 1520 100
2 apple 3421 254
3 apple 4500 364
1 huawei 3700 254
2 huawei 2700 354
3 huawei 5700 554
1 xiaomi 3521 254
2 xiaomi 3123 354
3 xiaomi 3412 554

目标:获得公司总利润,即总收入-总支出

1. company

DoubleMr.java

package hadoop_test.mutil_mr_10.company;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DoubleMr implements WritableComparable {

	private String name;
	private int profit;
	
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(name);
		out.writeInt(profit);
		
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.name=in.readUTF();
		this.profit=in.readInt();
		
	}

	@Override
	public int compareTo(DoubleMr o) {
		
		return this.profit-o.profit;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getProfit() {
		return profit;
	}

	public void setProfit(int profit) {
		this.profit = profit;
	}

	@Override
	public String toString() {
		return  name+" "+profit;
	}
	

}

2. mr1 FirstMrDriver
package hadoop_test.mutil_mr_10.mr1;

import hadoop_test.Utils_hadoop;
import hadoop_test.mutil_mr_10.company.DoubleMr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.awt.*;


public class FirstMrDriver {

	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "root");

		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);

		job.setJarByClass(FirstMrDriver.class);
	
		job.setMapperClass(FirstMrMapper.class);
		job.setReducerClass(FirstMrReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DoubleMr.class);
		job.setOutputKeyClass(DoubleMr.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.setInputPaths(job,new Path("/hadoop_test/muti_mr/mutil_mr.txt"));	
		FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/muti_mr/result"));
		
		job.waitForCompletion(true);
		
		
		
	}
}

FirstMrMapper
package hadoop_test.mutil_mr_10.mr1;

import hadoop_test.mutil_mr_10.company.DoubleMr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FirstMrMapper extends Mapper {

	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String[] data = line.split(" ");
		DoubleMr mr = new DoubleMr();
		//作为keyOut
		mr.setName(data[1]);
		//计算利润作为valueOut
		mr.setProfit(Integer.parseInt(data[2])-Integer.parseInt(data[3]));
		
		context.write(new Text(mr.getName()), mr);
	}
}

FirstMrReducer
package hadoop_test.mutil_mr_10.mr1;

import hadoop_test.mutil_mr_10.company.DoubleMr;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FirstMrReducer extends Reducer {
	
	@Override
	protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
		DoubleMr tmp = new DoubleMr();
		tmp.setName(key.toString());
		for(DoubleMr mr:values){
			tmp.setProfit(tmp.getProfit() + mr.getProfit());
		}
		context.write(tmp, NullWritable.get());
	}

}

3. mr2 SecondMrDriver
package hadoop_test.mutil_mr_10.mr2;

import hadoop_test.Utils_hadoop;
import hadoop_test.mutil_mr_10.company.DoubleMr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SecondMrDriver {
	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "root");

		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);

		job.setJarByClass(SecondMrDriver.class);
		job.setMapperClass(SecondMrMapper.class);

		job.setMapOutputKeyClass(DoubleMr.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job,new Path("/hadoop_test/muti_mr/result"));
		FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/muti_mr/result01"));
		
		job.waitForCompletion(true);
		
		
		
	}
}

依赖于mr1输出的文件结果result01,只有等到结果输出后,mr2中才能正常启动

SecondMapper
package hadoop_test.mutil_mr_10.mr2;

import hadoop_test.mutil_mr_10.company.DoubleMr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SecondMrMapper extends Mapper {
	
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String[] data = line.split(" ");
		DoubleMr mr = new DoubleMr();
		mr.setName(data[0]);
		mr.setProfit(Integer.parseInt(data[1]));
//		存在问题,是因为mapper端,是局部排序,要想全局排序
//		context.write(1, mr);
		context.write(mr, NullWritable.get());
	}

}

输出结果
mr1输出

经过mr2后

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735590.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号