栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce实现两表关联查询

MapReduce实现两表关联查询

数据说明:现有贷款表和汇率表。

其中贷款表(以下简称表一)字段为项目序号,行业,项目类别,币种ID,贷款金额,实际提款额,已还本金额,利息金额。

汇率表(以下简称表二)字段为币种ID,兑换价格,币种名称。

问题描述:使用mapreduce进行数据处理,根据汇率表把贷款表中的币种ID替换成币种名称;根据汇率表在贷款表中增加贷款金额_人民币,实际提款额_人民币,已还本金额_人民币,利息金额_人民币 四列,汇率转换后的数据(汇率表中币种ID与 贷款表中币种ID进行关联)。

实现思路:在map阶段使用币种id作为key,其余数据添加不同标识(例:表一的字符串前面添加符合"$',表二的添加”#“)作为value写入context中。在reduce阶段对key相同的value进行处理,value来自两个部分,一个是贷款表的数据、另一个是汇率表的数据。我们可以创建两个容器(Vectoe va/vb)来存储表一、表二的数据。等相同key值的数据全部分类进入容器的时候,我们就可以继续对容器进行遍历操作,取出对应的字符串进行数据替换或数据处理。

代码如下:

map任务

package cn.lzr.bka;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class JobMapper1 extends Mapper {

	@Override
	protected void map(LongWritable key, Text value, Mapper.Context context)
			throws IOException, InterruptedException {
			Path path =( (FileSplit) context.getInputSplit()).getPath();
			String filePath = path.getName().toString();
			String[] split = value.toString().split(",");
			if(filePath.contains("Foreign_Government_Loans")) {
				String id = split[3];   //获得币种id写入key
				String othString = split[0]+","+split[1]+","+split[2]+","+split[4]+","+split[5]+","+split[6]+","+split[7]+",";   //其余字段写回value
				context.write(new Text(id), new Text("$"+othString));    //在字符串最开头添加标识"$"
			}
			else if(filePath.contains("huilv")) {
				String id = split[0];   //获得币种id
				String othString = split[1]+","+split[2];
				context.write(new Text(id), new Text("#"+othString));
			}
	}
}

reduce任务

package cn.lzr.bka;

import java.io.IOException;
import java.util.Vector;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JobReduce1 extends Reducer{
	@Override
	protected void reduce(Text key, Iterable values, Reducer.Context context)
			throws IOException, InterruptedException {
		Vector va = new Vector();
		Vector vb = new Vector();
		for(Text value:values) {
			String line = value.toString();
			if(line.startsWith("$")) {
				va.add(line.substring(1));     //来自表一的数据,记得删除标识符号
			}
			else if(line.startsWith("#")){
				vb.add(line.substring(1));
			}
		}
		int sizea = va.size();    //获取容器长度
		int sizeb = vb.size();
		int i,j;
		for(i=0;i 

main任务

package cn.lzr.bka;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReducebase;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobMain extends Configured implements Tool {

	public int run(String[] arg0) throws Exception {
		//创建一个job任务
		Job job = Job.getInstance(super.getConf(),"join");
		//配置job任务
		job.setInputFormatClass(TextInputFormat.class);
		TextInputFormat.addInputPath(job, new Path("input/"));
		//指定map
		job.setMapperClass(JobMapper1.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		//指定reduce
		job.setReducerClass(JobReduce1.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		//设置输出类型
		Path path = new Path("Foreign_chang");     //指定输出目录
		job.setOutputFormatClass(TextOutputFormat.class);
		TextOutputFormat.setOutputPath(job,path);
		//判断是否存在目录
		FileSystem file = FileSystem.get(new URI("Foreign_chang"),new Configuration());
		boolean bl2 = file.exists(path);
		if(bl2) {
			file.delete(path, true);
		}
		boolean bl = job.waitForCompletion(true);
		return bl?0:1;
	}
	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();
		conf.set("mapred.textoutputformat.ignoreseparator","true");
		conf.set("mapred.textoutputformat.separator","");
		int run = ToolRunner.run(conf, new JobMain(),args);
		System.exit(run);
	}
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/336166.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号