mapreduce中可以实现map端的join以及reduce端的join,我们看下有什么区别。
mapJoin与reduceJoin- 数据准备
- reduce join
- map join
- hive的map join测试
有一张订单表(order):
1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
三列对应的字段分别是订单ID,产品ID以及产品数量。
然后还有一张产品表(pd):
01 小米 02 华为 03 格力
对应的字段分别是产品ID和产品名称。
显然,我们可以根据两张表公有的字段即产品ID进行join,然后我们希望订单表能够出现对应的产品名称,类似这样的结果:
1004 小米 4 1001 小米 1 1005 华为 5 1002 华为 2 1006 格力 6 1003 格力 3reduce join
为了装载这些字段我们先要准备一个bean:
public class TableBean implements Writable {
private String id; // 订单id
private String pid; // 商品id
private int amount; // 商品数量
private String pname;// 商品名称
private String flag; // 标记是什么表 order pd
// 空参构造
public TableBean() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
@Override
public String toString() {
// id pname amount
return id + "t" + pname + "t" + amount ;
}
}
因为使用了flag字段所以我们可以用一个bean类装载两张表。
map端做的工作主要是读取数据,将数据赋给TableBean,并且以pid字段作为输出的key,以TableBean作为value输出,这样相同的的pid就能够进入同一个reducer了。
public class TableMapper extends Mapper{ private String fileName; private Text outK = new Text(); private TableBean outV = new TableBean(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 初始化 order pd FileSplit split = (FileSplit) context.getInputSplit(); fileName = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 判断是哪个文件的 // 处理的是订单表 if (fileName.contains("order")){ String[] split = line.split("t"); // 以pid作为key outK.set(split[1]); outV.setId(split[0]); outV.setPid(split[1]); outV.setAmount(Integer.parseInt(split[2])); //order表缺的是产品名称 outV.setPname(""); outV.setFlag("order"); }else { // 处理的是商品表 String[] split = line.split("t"); //以pid作为key outK.set(split[0]); //pd表却的是订单ID outV.setId(""); outV.setPid(split[0]); outV.setAmount(0); outV.setPname(split[1]); outV.setFlag("pd"); } // 写出,相同的key会进入同一个reducer context.write(outK, outV); } }
reducer端接受到的数据大概是:
pid=01 ===> List(TableBean(order1), TableBean(order2),TableBean(pd))
然后我们将pd中的产品名称赋给一个个拥有相同pid的order。
public class TableReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { // 准备初始化集合 ArrayList orderBeans = new ArrayList<>(); TableBean pdBean = new TableBean(); // 循环遍历 for (TableBean value : values) { // 订单表 if ("order".equals(value.getFlag())){ TableBean tmpTableBean = new TableBean(); try { BeanUtils.copyProperties(tmpTableBean,value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(tmpTableBean); }else { // 商品表 try { BeanUtils.copyProperties(pdBean,value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } // 循环遍历orderBeans,赋值 pdname for (TableBean orderBean : orderBeans) { //以order表作为主表 orderBean.setPname(pdBean.getPname()); context.write(orderBean,NullWritable.get()); } } }
但我们不建议reduce join,因为有reduce必然要经过shuffle(当然也不绝对),shuffle是很重的,传输的数据量大也会很慢,所以我们更希望在map端就可以完成join。
map join在map端完成join,就不需要reduce了。
map端join要借助一个类:DistributedCache。
相当于是把小表放到DistributedCache中,然后每一个mapTask都可以读到。这就相当于每一个mapTask都有一份小表的数据,这和spark中的广播变量是一个道理。
大表是可以被切分的,每个切片的字段和小表去join,如果能join上,那就输出。有多少个mapTask,就会有多少个输出结果。
首先,我们需要将小表的数据放入DistributedCache:
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置加载jar包路径
job.setJarByClass(MapJoinDriver.class);
// 3 关联mapper
job.setMapperClass(MapJoinMapper.class);
// 4 设置Map输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///D:/hadoop/input/joininput/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\hadoop\input\joininput\order.txt"));
FileOutputFormat.setOutputPath(job, new Path("D:\hadoop\output\mapjoinout"));
// 7 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
job.addCacheFile(new URI("file:///D:/hadoop/input/joininput/pd.txt"));
完成了小表的公有化需求。
map端只要用流的api将小表读出来,数据放在一个HashMap里,然后让大表一个一个取,join的上的输出。
public class MapJoinMapper extends Mapper{ private HashMap pdMap = new HashMap<>(); private Text outK = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 获取缓存的文件,并把文件内容封装到集合 pd.txt(小表) URI[] cacheFiles = context.getCacheFiles(); FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0])); // 从流中读取数据 BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8")); String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { // 切割 String[] fields = line.split("t"); // 赋值 pdMap.put(fields[0], fields[1]); } // 关流 IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 处理 order.txt String line = value.toString(); String[] fields = line.split("t"); // 获取pid(join的操作) String pname = pdMap.get(fields[1]); // 获取订单id 和订单数量 // 封装 outK.set(fields[0] + "t" + pname + "t" + fields[2]); context.write(outK, NullWritable.get()); } }
这样就没有shuffle了。这也是解决数据倾斜的一个思路。
hive的map join测试我们创建一张大表,一张小表,一个join后的表:
// 创建大表 create table bigtable(id bigint, `time` bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't'; // 创建小表 create table smalltable(id bigint, `time` bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't'; // 创建 join 后表的语句 create table jointable(id bigint, `time` bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by 't';
三张表的结构是一样的,数据大概是:
大表123M,小表12.4M。
假设我们关掉map join:
set hive.auto.convert.join = false;
默认他是打开的,但就算是打开的,也不一定会用map join。还必须满足小表在25M以下:
set hive.mapjoin.smalltable.filesize=25000000;
我们这里是满足的。
那就先关掉map join实验:
执行:
insert overwrite table jointable select b.id, b.`time`, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url from smalltable s left join bigtable b on b.id = s.id;
然后再开启测试时间。
开启map join后执行时间会更短。
另外,至于大表在前还是小表在前,hive已经做了优化,已经不用那么在意了。



