栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

hadoop中join操作

hadoop中join操作

前言

在mysql中,经常涉及到2张表或者多张表的关联查询,通常通过中间字段将两个表做关联,在MapReduce中,某些场景下也会遇到类似的需求,比如说,将两个原始的日志文件,通过中间业务字段进行关联,然后重新输出为一个新的文件

如下图所示,左边可理解为一张订单表,右边可理解为与之对应的商品表,相信大家一眼就能明白

现在提出需求,将这两个文件的数据,通过MapReduce输出得到下面这个格式的文件,该怎么做呢?

如果在mysql中,只需要将订单表和产品表通过产品ID做一下关联就可以得出结果,通过MapReduce该如何实现呢?

实现思路

由于整个MapReduce的过程仍然是存在的,其实只需要想明白,要实现这种业务,Map阶段需要做什么,Reduce阶段做什么就可以了

现在来思考下,就像使用mysql做关联一样,肯定有一张表做驱动表,肯定要从这两个文件中选取一个关联字段进行关联

那么很容易联想到,Map阶段中是把读取进来的数据按照一行行读取,分割单词,不同列的单词代表着不同的含义,就需要在输出出去之前,把最终格式的结果封装好,但是这里的问题是,map方法中的输入参数只能来自于一个文件,另一个文件的数据怎么加载呢?

换句话说,在map方法中要完成结果的组装,肯定需要有两张表的关联操作,那么就容易理解了,肯定是在进入map方法之前,另一个文件的数据要加载到JVM内存中,然后在map方法中直接取出来进行关联操作即可

以上的思路就是本篇接下来要通过代码实现的一种思路,即mapjoin

mapjoin解决办法

有了上面的解决思路就好办了,在MapReduce中,可以通过下面的这行代码,在job类中加进去之后,再在自定义mapper的setup方法中就可以读取进来

job.addCacheFile()

于是很自然的联想到,在map阶段的setup方法中,将product文件中的数据读取并封装成map ,即 <产品编号,产品名称> -> <01,苹果> ,然后再在map方法中解析并匹配数据,最后输出即可

编码实现

1、自定义Mapper

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class JoinMapper extends Mapper {

    private Map productMap = new HashMap<>();
    private Text text = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //通过job中设置的缓存文件得到pd.txt数据
        URI[] cacheFiles = context.getCacheFiles();
        Path path = new Path(cacheFiles[0]);
        //获取文件输入流
        FileSystem fs = FileSystem.get(context.getConfiguration());
        FSDataInputStream fis = fs.open(path);

        //通过包装流转换为reader
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));

        //逐行读取
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())) {
            //切分为一行数据
            //01	苹果
            String[] split = line.split("t");
            productMap.put(split[0], split[1]);
        }
        //关流
        IOUtils.closeStream(reader);
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1001	01	1
        String[] fields = value.toString().split("t");

        //从map里面取出pname
        String pname = productMap.get(fields[1]);

        //数据拼接
        String result = fields[0] + "t" + pname + "t" + fields[2];
        text.set(result);

        context.write(text,NullWritable.get());
    }


}

2、自定义Reducer

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class JoinReducer extends Reducer {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        for (NullWritable nullWritable : values){
            context.write(key,NullWritable.get());
        }
    }

}

3、job类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

public class JoinJob {

    public static void main(String[] args) throws Exception {

        //1、获取job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //2、设置jar路径
        job.setJarByClass(JoinJob.class);

        //3、关联mapper 和 Reducer
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);

        //4、设置 map输出的 key/val 的类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        //5、设置最终输出的key / val 类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 加载缓存数据
        job.addCacheFile(new URI("file:///F:/网盘/csv/order/product.txt"));
        // Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0

        //6、设置最终的输出路径
        String inputPath = "F:\网盘\csv\order\order.txt";
        String outPath = "F:\网盘\csv\order\result1";
        FileInputFormat.setInputPaths(job,new Path(inputPath));
        FileOutputFormat.setOutputPath(job,new Path(outPath));

        // 7 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

最后运行一下job类,观察输出结果,可以看到,文件格式即为本文开篇期望的效果

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/700262.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号