原理:在map task执行的时候,会先小表的数据读入memory中,每次在map函数遍历大表的时候,就会查找memory中对应相同join key的记录集,然后和大表中的数据连接,其实Map-side Join连表和sql语句中的join是一样的。你可以这样理解:在map处理大文件后,读取小文件的时候,他就会做连接。
适用场景:Map-side Join适用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。
废话不多说,直接用例子说明:
两个txt文件:(数据太多,需要数据可以私聊我)
movies.txt:电影ID:电影名称:电影类别
ratings.txt:用户ID:电影ID:用户评分:评分时间
每个用户会对一个或者多个电影进行评分
问题:找出评论时间最晚的10条评论,输出电影名和评论时间
输出样例:
电影名称 评分时间
很明显,这两个表是通过电影ID来连接,我们在map阶段就实现两个表的连接,输出<评论时间,电影名称>,因为在map到reduce之间它会将key(评论时间)升序排列,在reduce阶段使用让Sort阶段的Key降序排列的比较器,使key从升序变为降序,然后使用for语句输出前十个value,这里调用了一个DateUtil工具类,把时间戳转换成年月日显示。
重点看重写map方法的代码
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
//这里获取的是ratings.txt,ratings.txt是大文件
//获取到txt文件中每一行的内容
String row = value.toString();
//将每一行的内容按:进行切分
String[] values = row.split(":");
//从values数组中得到电影id
String movies_id = values[1];//电影id
//从values数组中得到评分时间
int time = Integer.parseInt(values[3]);//评分时间
//moviesMap里面是存放着movies.txt的内容
//获取movies.txt中每一行的信息,即电影ID:电影名称:电影类别
//从这里开始大表就开始变遍历小表中的内容,找到相同的关键字做连接,当你读取小表movies.txt文件中的内容的时候,连接就开始了
String movies_infor = moviesMap.getOrDefault(movies_id, "");//获取所有电影信息
//获取电影名
String movies_name =movies_infor.split(":")[1];
k.set(movies_name);
//这里输出(评论时间:电影名)
context.write(new IntWritable(time),k);
// System.out.println(time);
// System.out.println(k);
}
下面给出问题的完整代码:
package cn.neu.connection.first;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.*;
import java.lang.reflect.Array;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import static cn.neu.connection.first.DateUtil.timeStamp2Date;
public class MapSideJoin {
public static class MyMapper extends Mapper
上面内容都是个人见解,有不对的地方望指出!!!