本博文出自51CTO博客 zengzhaozheng博主,有任何问题请进入博主页面互动讨论!
博文地址:http://zengzhaozheng.blog.51cto.com/8219051/1392961 |
一、概述
对于RDBMS中的join操作大伙一定非常熟悉,写sql的时候要十分注意细节,稍有差池就会耗时巨久造成很大的性能瓶颈,而在Hadoop中使用MapReduce框架进行join的操作时同样耗时,但是由于hadoop的分布式设计理念的特殊性,因此对于这种join操作同样也具备了一定的特殊性。本文主要对MapReduce框架对表之间的join操作的几种实现方式进行详细分析,并且根据我在实际开发过程中遇到的实际例子来进行进一步的说明。
二、实现原理
1、在Reudce端进行连接。
在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,***进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,***进行笛卡尔只就ok了。原理非常简单,下面来看一个实例:
(1)自定义一个value返回类型:
- package com.mr.reduceSizeJoin; import java.io.DataInput;
- import java.io.DataOutput; import java.io.IOException;
- import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable;
- public class Combinevalues implements WritableComparable{ //private static final Logger logger = LoggerFactory.getLogger(Combinevalues.class);
- private Text joinKey;//链接关键字 private Text flag;//文件来源标志
- private Text secondPart;//除了链接键外的其他部分 public void setJoinKey(Text joinKey) {
- this.joinKey = joinKey; }
- public void setFlag(Text flag) { this.flag = flag;
- } public void setSecondPart(Text secondPart) {
- this.secondPart = secondPart; }
- public Text getFlag() { return flag;
- } public Text getSecondPart() {
- return secondPart; }
- public Text getJoinKey() { return joinKey;
- } public Combinevalues() {
- this.joinKey = new Text(); this.flag = new Text();
- this.secondPart = new Text(); } @Override
- public void write(DataOutput out) throws IOException { this.joinKey.write(out);
- this.flag.write(out); this.secondPart.write(out);
- } @Override
- public void readFields(DataInput in) throws IOException { this.joinKey.readFields(in);
- this.flag.readFields(in); this.secondPart.readFields(in);
- } @Override
- public int compareTo(Combinevalues o) { return this.joinKey.compareTo(o.getJoinKey());
- } @Override
- public String toString() { // TODO Auto-generated method stub
- return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]"; }
- }
(2)map、reduce主体代码
- package com.mr.reduceSizeJoin; import java.io.IOException;
- import java.util.ArrayList; import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger;
- import org.slf4j.LoggerFactory; public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
- private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class); public static class LeftOutJoinMapper extends Mapper
其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低。
2、在Map端进行连接。
使用场景:一张表十分小、一张表很大。
用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
直接上代码,比较简单:
- package com.mr.mapSideJoin; import java.io.BufferedReader;
- import java.io.FileReader; import java.io.IOException;
- import java.util.HashMap; import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache;
- import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;
- import org.slf4j.Logger; import org.slf4j.LoggerFactory;
-
- public class MapSideJoinMain extends Configured implements Tool{ private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);
- public static class LeftOutJoinMapper extends Mapper
这里说说DistributedCache。DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。
另外还有一种比较变态的Map Join方式,就是结合Hbase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。
3、SemiJoin。
SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。看代码:
- package com.mr.SemiJoin; import java.io.BufferedReader;
- import java.io.FileReader; import java.io.IOException;
- import java.util.ArrayList; import java.util.HashSet;
- import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured;
- import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner; import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class SemiJoin extends Configured implements Tool{ private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);
- public static class SemiJoinMapper extends Mapper
这里还说说SemiJoin也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,容易在Map端造成OOM。
三、总结
blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率***,其次是SemiJoin,***的是reduce join。另外,写分布式大数据处理程序的时***要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到***,使我们的代码倾向性更好。