目录
1.Reduce Join
2.Reduce Join案例实操
1)需求
2)需求分析
3)代码实现
4)测试
5)总结
3.Map Join
4.Map Join案例实操
1)需求
2)需求分析
3)实现代码
1.Reduce Join
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
2.Reduce Join案例实操
1)需求
表1 订单数据表t_order
| id | pid | amount |
| 1001 | 01 | 1 |
| 1002 | 02 | 2 |
| 1003 | 03 | 3 |
| 1004 | 01 | 4 |
| 1005 | 02 | 5 |
| 1006 | 03 | 6 |
表2 商品信息表t_product
| pid | pname |
| 01 | 小米 |
| 02 | 华为 |
| 03 | 格力 |
将商品信息表中数据根据商品pid合并到订单数据表中。
表3 最终数据形式
| id | pname | amount |
| 1001 | 小米 | 1 |
| 1004 | 小米 | 4 |
| 1002 | 华为 | 2 |
| 1005 | 华为 | 5 |
| 1003 | 格力 | 3 |
| 1006 | 格力 | 6 |
2)需求分析
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联。
3)代码实现
(1)创建商品和订单合并后的TableBean类
public class TableBean implements Writable {
//id pid amount
//pid pname
private String id;//订单id
private String pid;//商品id
private int amount;//商品数量
private String pname;//商品名称
private String flag;//标记是什么表 order还是pd
//空参构造 序列化必须要的
public TableBean() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPid() {
return pid;
}
public void setPid(String pid) {
this.pid = pid;
}
public int getAmount() {
return amount;
}
public void setAmount(int amount) {
this.amount = amount;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(id);
out.writeUTF(pid);
out.writeInt(amount);
out.writeUTF(pname);
out.writeUTF(flag);
}
@Override
public void readFields(DataInput in) throws IOException {
this.id = in.readUTF();
this.pid = in.readUTF();
this.amount = in.readInt();
this.pname = in.readUTF();
this.flag = in.readUTF();
}
@Override
public String toString() {
return id + "t" + pname + "t" + "t" + amount;
}
}
(2)编写TableMapper类
public class TableMapper extends Mapper{ private String fileName; private Text outK = new Text(); private TableBean outV = new TableBean(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取各文件切片 FileSplit split = (FileSplit)context.getInputSplit(); //获取文件名称 //一种优化手段,只获取一次名称就够了 fileName = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1 获取一行 String line = value.toString(); //2 判断是哪个文件的 if (fileName.contains("order")) {//处理的是订单表 String[] split = line.split("t"); //封装k v outK.set(split[1]); outV.setId(split[0]); outV.setPid(split[1]); outV.setAmount(Integer.parseInt(split[2])); outV.setPname("");//设默认值 outV.setFlag("order"); } else {//处理的是商品表 String[] split = line.split("t"); outK.set(split[0]); outV.setId(""); outV.setPid(split[0]); outV.setAmount(0);//设默认值 outV.setPname(split[1]); outV.setFlag("pd"); } context.write(outK, outV); } }
(3)编写TableReducer类
public class TableReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //这组数据进入同一个reduce方法 //01 1001 1 order //01 1004 4 order //01 小米 pd ArrayList orderBeans = new ArrayList<>(); TableBean pdBean = new TableBean(); //循环遍历 for (TableBean value : values) { if("order".equals(value.getFlag())){//订单表 //使用orderBeans.add(value),orderBeans加入的值是地址,实际上加入的是同一个地址,但是地址内的值一直在变, //最后加入orderBeans的都是添加的最后一个 //因此需要一个中间量,new出来的中间量地址都不一样,最后加入orderBeans的是不同对象 TableBean tmptableBean = new TableBean(); try { BeanUtils.copyProperties(tmptableBean,value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } orderBeans.add(tmptableBean); }else {//商品表 //因为pd.txt的一个pid只有一行,即reduce方法执行一次只处理一行的商品表内容,不需要像订单表一样那么麻烦 try { BeanUtils.copyProperties(pdBean,value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } //循环遍历orderBeans,赋值pdName //因为以pid为key,reduce方法的一次执行中pid都是相同的,订单表就和商品表关联起来 for (TableBean orderBean : orderBeans) { orderBean.setPname(pdBean.getPname()); context.write(orderBean,NullWritable.get()); } } }
(4)编写TableDriver类
public class TableDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(TableDriver.class);
job.setMapperClass(TableMapper.class);
job.setReducerClass(TableReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(TableBean.class);
job.setOutputKeyClass(TableBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("D:\code\Hadoop\input\inputtable"));
FileOutputFormat.setOutputPath(job, new Path("D:\code\Hadoop\output1"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
4)测试
5)总结
缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
解决方案:Map端实现数据合并。
3.Map Join
1)使用场景
Map Join适用于一张表十分小、一张表很大的场景。
2)优点
思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
3)具体办法:采用DistributedCache
(1)在Mapper的setup阶段,将文件读取到缓存集合中。
(2)在Driver驱动类中加载缓存。
//缓存普通文件到Task运行节点。
job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
//如果是集群运行,需要设置HDFS路径
job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
4.Map Join案例实操
1)需求
表1 订单数据表t_order
| id | pid | amount |
| 1001 | 01 | 1 |
| 1002 | 02 | 2 |
| 1003 | 03 | 3 |
| 1004 | 01 | 4 |
| 1005 | 02 | 5 |
| 1006 | 03 | 6 |
表2 商品信息表t_product
| pid | pname |
| 01 | 小米 |
| 02 | 华为 |
| 03 | 格力 |
将商品信息表中数据根据商品pid合并到订单数据表中。
表3 最终数据形式
| id | pname | amount |
| 1001 | 小米 | 1 |
| 1004 | 小米 | 4 |
| 1002 | 华为 | 2 |
| 1005 | 华为 | 5 |
| 1003 | 格力 | 3 |
| 1006 | 格力 | 6 |
2)需求分析
MapJoin适用于关联表中有小表的情形。
3)实现代码
(1)先在MapJoinDriver驱动类中添加缓存文件
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
// 1 获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置加载jar包路径
job.setJarByClass(MapJoinDriver.class);
// 3 关联mapper
job.setMapperClass(MapJoinMapper.class);
// 4 设置Map输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 加载缓存数据
job.addCacheFile(new URI("file:///D:/code/Hadoop/input/tablecache/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 6 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\code\Hadoop\input\inputtable2"));
FileOutputFormat.setOutputPath(job, new Path("D:\code\Hadoop\output2"));
// 7 提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
(2)在MapJoinMapper类中的setup方法中读取缓存文件
public class MapJoinMapper extends Mapper{ private HashMap pdMap = new HashMap<>(); private Text outK = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取缓存的文件,并把文件内容封装到集合 pd.txt URI[] cacheFiles = context.getCacheFiles(); FileSystem fs = FileSystem.get(context.getConfiguration()); FSDataInputStream fis = fs.open(new Path(cacheFiles[0])); //从流中读取数据 BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8")); String line; //StringUtils导的包是org.apache.commons.lang while (StringUtils.isNotEmpty(line = reader.readLine())){ String[] fields = line.split("t"); //赋值 pdMap.put(fields[0], fields[1]); } //关流 IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //处理order.txt String line = value.toString(); String[] fields = line.split("t"); //获取pid String pname = pdMap.get(fields[1]); //获取订单id和订单数量 //封装 outK.set(fields[0] + "t" + pname + "t" + fields[2]); context.write(outK, NullWritable.get()); } }
不需要reduce类,全部在map端处理,map后的结果为最终结果。



