在mysql中,经常涉及到2张表或者多张表的关联查询,通常通过中间字段将两个表做关联,在MapReduce中,某些场景下也会遇到类似的需求,比如说,将两个原始的日志文件,通过中间业务字段进行关联,然后重新输出为一个新的文件
如下图所示,左边可理解为一张订单表,右边可理解为与之对应的商品表,相信大家一眼就能明白
现在提出需求,将这两个文件的数据,通过MapReduce输出得到下面这个格式的文件,该怎么做呢?
如果在mysql中,只需要将订单表和产品表通过产品ID做一下关联就可以得出结果,通过MapReduce该如何实现呢?
实现思路由于整个MapReduce的过程仍然是存在的,其实只需要想明白,要实现这种业务,Map阶段需要做什么,Reduce阶段做什么就可以了
现在来思考下,就像使用mysql做关联一样,肯定有一张表做驱动表,肯定要从这两个文件中选取一个关联字段进行关联
那么很容易联想到,Map阶段中是把读取进来的数据按照一行行读取,分割单词,不同列的单词代表着不同的含义,就需要在输出出去之前,把最终格式的结果封装好,但是这里的问题是,map方法中的输入参数只能来自于一个文件,另一个文件的数据怎么加载呢?
换句话说,在map方法中要完成结果的组装,肯定需要有两张表的关联操作,那么就容易理解了,肯定是在进入map方法之前,另一个文件的数据要加载到JVM内存中,然后在map方法中直接取出来进行关联操作即可
以上的思路就是本篇接下来要通过代码实现的一种思路,即mapjoin
mapjoin解决办法
有了上面的解决思路就好办了,在MapReduce中,可以通过下面的这行代码,在job类中加进去之后,再在自定义mapper的setup方法中就可以读取进来
job.addCacheFile()
于是很自然的联想到,在map阶段的setup方法中,将product文件中的数据读取并封装成map ,即 <产品编号,产品名称> -> <01,苹果> ,然后再在map方法中解析并匹配数据,最后输出即可
编码实现1、自定义Mapper
import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; public class JoinMapper extends Mapper{ private Map productMap = new HashMap<>(); private Text text = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //通过job中设置的缓存文件得到pd.txt数据 URI[] cacheFiles = context.getCacheFiles(); Path path = new Path(cacheFiles[0]); //获取文件输入流 FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(path); //通过包装流转换为reader BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8")); //逐行读取 String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { //切分为一行数据 //01 苹果 String[] split = line.split("t"); productMap.put(split[0], split[1]); } //关流 IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1001 01 1 String[] fields = value.toString().split("t"); //从map里面取出pname String pname = productMap.get(fields[1]); //数据拼接 String result = fields[0] + "t" + pname + "t" + fields[2]; text.set(result); context.write(text,NullWritable.get()); } }
2、自定义Reducer
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class JoinReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { for (NullWritable nullWritable : values){ context.write(key,NullWritable.get()); } } }
3、job类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class JoinJob {
public static void main(String[] args) throws Exception {
//1、获取job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//2、设置jar路径
job.setJarByClass(JoinJob.class);
//3、关联mapper 和 Reducer
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
//4、设置 map输出的 key/val 的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//5、设置最终输出的key / val 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///F:/网盘/csv/order/product.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
//6、设置最终的输出路径
String inputPath = "F:\网盘\csv\order\order.txt";
String outPath = "F:\网盘\csv\order\result1";
FileInputFormat.setInputPaths(job,new Path(inputPath));
FileOutputFormat.setOutputPath(job,new Path(outPath));
// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
最后运行一下job类,观察输出结果,可以看到,文件格式即为本文开篇期望的效果



