栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce使用Map-side Join实现连表查询

MapReduce使用Map-side Join实现连表查询

原理:在map task执行的时候,会先小表的数据读入memory中,每次在map函数遍历大表的时候,就会查找memory中对应相同join key的记录集,然后和大表中的数据连接,其实Map-side Join连表和sql语句中的join是一样的。你可以这样理解:在map处理大文件后,读取小文件的时候,他就会做连接。

适用场景:Map-side Join适用场景是一个大表和一个小表的连接操作,其中,“小表”是指文件足够小,可以加载到内存中。该算法可以将join算子执行在Map端,无需经历shuffle和reduce等阶段,因此效率非常高。

废话不多说,直接用例子说明:

两个txt文件:(数据太多,需要数据可以私聊我)

movies.txt:电影ID:电影名称:电影类别

ratings.txt:用户ID:电影ID:用户评分:评分时间

 

每个用户会对一个或者多个电影进行评分

问题:找出评论时间最晚的10条评论,输出电影名和评论时间

输出样例:

电影名称 评分时间

 

很明显,这两个表是通过电影ID来连接,我们在map阶段就实现两个表的连接,输出<评论时间,电影名称>,因为在map到reduce之间它会将key(评论时间)升序排列,在reduce阶段使用让Sort阶段的Key降序排列的比较器,使key从升序变为降序,然后使用for语句输出前十个value,这里调用了一个DateUtil工具类,把时间戳转换成年月日显示。

重点看重写map方法的代码

        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            //这里获取的是ratings.txt,ratings.txt是大文件
            //获取到txt文件中每一行的内容
            String row = value.toString();
            //将每一行的内容按:进行切分
            String[] values = row.split(":");
            //从values数组中得到电影id
            String movies_id = values[1];//电影id
            //从values数组中得到评分时间
            int time = Integer.parseInt(values[3]);//评分时间
            //moviesMap里面是存放着movies.txt的内容
            //获取movies.txt中每一行的信息,即电影ID:电影名称:电影类别
            //从这里开始大表就开始变遍历小表中的内容,找到相同的关键字做连接,当你读取小表movies.txt文件中的内容的时候,连接就开始了
            String movies_infor = moviesMap.getOrDefault(movies_id, "");//获取所有电影信息
            //获取电影名
            String movies_name =movies_infor.split(":")[1];
            k.set(movies_name);
            //这里输出(评论时间:电影名)
            context.write(new IntWritable(time),k);
//            System.out.println(time);
//            System.out.println(k);
        }

 下面给出问题的完整代码:

package cn.neu.connection.first;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.*;
import java.lang.reflect.Array;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;

import static cn.neu.connection.first.DateUtil.timeStamp2Date;

public class MapSideJoin {

    public static class MyMapper extends Mapper {
        public Text k = new Text();
        //读取缓存文件,并按照文件名称读取到map或者别的数据结构中
        //定义一个存储movies数据缓存的数据结构
        Map moviesMap = new HashMap<>();

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            //获取文件系统的操作对象,通过这个对象来获取小表的数据
            FileSystem fs = FileSystem.get(context.getConfiguration());
            Path path = new Path("movies.txt");
            FSDataInputStream open = fs.open(path);
            BufferedReader br = new BufferedReader(new InputStreamReader(open));
            String line;
            while((line=br.readLine())!=null){
                //把小表数据读取到内存,也就是放入HashMap中
                //因为HashMap被创建后停留在内存中,然后随着程序的结束而消失
                String[] movies_name=line.split(":");
                moviesMap.put(movies_name[0],line);
            }
        }


        //读取login.txt进行处理
        @Override
        protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            //这里获取的是ratings.txt
            String row = value.toString();

            String[] values = row.split(":");
            String movies_id = values[1];//电影id
            int time = Integer.parseInt(values[3]);//评论时间
            String movies_infor = moviesMap.getOrDefault(movies_id, "");//获取所有电影信息
            //1097:E.T. the Extra-Terrestrial (1982):Children's|Drama|Fantasy|Sci-Fi
            String movies_name =movies_infor.split(":")[1];

//            System.out.println(movies_infor);
            k.set(movies_name);
            //这里输出(评论时间:电影名)
            context.write(new IntWritable(time),k);
            System.out.println(time);
            System.out.println(k);
        }
    }

    public static class MyReducer extends Reducer {
        int i = 1;
        @Override
        protected void reduce(IntWritable key, Iterable values, Context context) throws IOException, InterruptedException {

            //因为在map到reduce之间它会将key(评论时间)升序排列,我们使用使Sort阶段的Key降序排列的比较器后,
            // 在reduce得到的key就是降序排列的,我们直接输出前十个数据就好了
            if (i < 10) {
                for (Text value : values) {
                    String date = timeStamp2Date(String.valueOf(key), "yyyy-MM-dd HH:mm:ss");
                    context.write(value, new Text(date));
                }
                i++;
            }
        }

        //使Sort阶段的Key降序排列的比较器
        public static class IntWritableDecreasingComparator extends
                IntWritable.Comparator {
            public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
                return -super.compare(b1, s1, l1, b2, s2, l2);
            }
        }

        public static void main(String[] args) throws IOException {
            try {
                //Configuration conf = HdfsUtils.getConf();
                Job job = Job.getInstance(new Configuration());
                job.setJarByClass(MapSideJoin.class);
                job.setMapperClass(MyMapper.class);
                job.setMapOutputKeyClass(IntWritable.class);
                job.setMapOutputValueClass(Text.class);

                job.setReducerClass(MyReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
                //设置Sort阶段使用比较器
                job.setSortComparatorClass(IntWritableDecreasingComparator.class);
                //设置缓存
                job.addCacheFile(new URI("movies.txt"));

                FileInputFormat.addInputPath(job, new Path("ratings.txt"));
                FileOutputFormat.setOutputPath(job, new Path("output"));
                FileSystem fileSystem = FileSystem.get( new Configuration());
                if (fileSystem.exists(new Path("output"))) {//判断输出文件是否存在
                    fileSystem.delete(new Path("output"), true); //存在则删除文件,否则会出错
                }

                int success = job.waitForCompletion(true) ? 0 : 1;
                System.exit(success);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
    }
}

上面内容都是个人见解,有不对的地方望指出!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/699517.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号