数据说明:现有贷款表和汇率表。
其中贷款表(以下简称表一)字段为项目序号,行业,项目类别,币种ID,贷款金额,实际提款额,已还本金额,利息金额。
汇率表(以下简称表二)字段为币种ID,兑换价格,币种名称。
问题描述:使用mapreduce进行数据处理,根据汇率表把贷款表中的币种ID替换成币种名称;根据汇率表在贷款表中增加贷款金额_人民币,实际提款额_人民币,已还本金额_人民币,利息金额_人民币 四列,汇率转换后的数据(汇率表中币种ID与 贷款表中币种ID进行关联)。
实现思路:在map阶段使用币种id作为key,其余数据添加不同标识(例:表一的字符串前面添加符合"$',表二的添加”#“)作为value写入context中。在reduce阶段对key相同的value进行处理,value来自两个部分,一个是贷款表的数据、另一个是汇率表的数据。我们可以创建两个容器(Vectoe
代码如下:
map任务
package cn.lzr.bka; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class JobMapper1 extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { Path path =( (FileSplit) context.getInputSplit()).getPath(); String filePath = path.getName().toString(); String[] split = value.toString().split(","); if(filePath.contains("Foreign_Government_Loans")) { String id = split[3]; //获得币种id写入key String othString = split[0]+","+split[1]+","+split[2]+","+split[4]+","+split[5]+","+split[6]+","+split[7]+","; //其余字段写回value context.write(new Text(id), new Text("$"+othString)); //在字符串最开头添加标识"$" } else if(filePath.contains("huilv")) { String id = split[0]; //获得币种id String othString = split[1]+","+split[2]; context.write(new Text(id), new Text("#"+othString)); } } }
reduce任务
package cn.lzr.bka; import java.io.IOException; import java.util.Vector; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class JobReduce1 extends Reducer{ @Override protected void reduce(Text key, Iterable values, Reducer .Context context) throws IOException, InterruptedException { Vector va = new Vector (); Vector vb = new Vector (); for(Text value:values) { String line = value.toString(); if(line.startsWith("$")) { va.add(line.substring(1)); //来自表一的数据,记得删除标识符号 } else if(line.startsWith("#")){ vb.add(line.substring(1)); } } int sizea = va.size(); //获取容器长度 int sizeb = vb.size(); int i,j; for(i=0;i main任务
package cn.lzr.bka; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReducebase; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobMain extends Configured implements Tool { public int run(String[] arg0) throws Exception { //创建一个job任务 Job job = Job.getInstance(super.getConf(),"join"); //配置job任务 job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("input/")); //指定map job.setMapperClass(JobMapper1.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //指定reduce job.setReducerClass(JobReduce1.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //设置输出类型 Path path = new Path("Foreign_chang"); //指定输出目录 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,path); //判断是否存在目录 FileSystem file = FileSystem.get(new URI("Foreign_chang"),new Configuration()); boolean bl2 = file.exists(path); if(bl2) { file.delete(path, true); } boolean bl = job.waitForCompletion(true); return bl?0:1; } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.ignoreseparator","true"); conf.set("mapred.textoutputformat.separator",""); int run = ToolRunner.run(conf, new JobMain(),args); System.exit(run); } }



