栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

MapReduce左外连接

MapReduce左外连接

Hadoop-MapReduce左外连接(通用型) 来源篇 书名

数据算法-Hadoop/Spark大数据处理技巧

作者

Mahmoud Parsian著;

苏金国 杨健康两位老师翻译的一本书

序号

书中96页的左外连接介绍

原理篇

两张表T1和T2,T1与T2在连接键Key的作用下,相交的部分包含两张表的完整属性,不相交的部分,包含T1的所有属性,T2的属性则全为Null值。

此处左表为用户表

数据源 用户表(用户Id,交易地址)
u9242,CA
u4671,UT
u4195,UT
u3836,GA
u7507,CA
u3177,UT
u6598,CA
u2283,CA
u4815,CA
u7403,GA
u5967,GA
u3028,CA
u6277,CA
u6791,CA
交易表(交易id,商品id,用户id,商品数量,单价)
t8414,p5885,u5830,4,147
t5213,p9339,u5670,3,274
t7187,p8204,u8341,8,202
t1039,p1637,u8589,7,52
t861,p3175,u2609,0,342
t5492,p8767,u8417,2,330
t4467,p3445,u1200,0,253
t89,p7055,u8402,2,61
t6323,p9525,u2155,8,33
t3655,p9556,u3555,8,280
t6377,p9949,u7538,1,209
t766,p2543,u7881,9,14
t5667,p5268,u8852,7,286
关键实现点

MapReduce的规约器是根据Key的compareTo方法进行排序和合并操作**(等于0时)**

故本次实现定义了JoinKeyWritable来替换书中的PairOfString**(主要是找不到)**

这次Join时的key为userId,故JoinKey的compareTo方法应该在userId相等时传递到同一个规约器的value迭代器中(Iterable values)

左外连接目标

用户表左外连接交易表

得到所有用户交易的商品以及用户的地址

实现类
类名类描述
UserMapper用户信息映射类
TransactionMapper交易信息映射类
ProductLocationReducer用户信息和交易信息规约类
LeftOuterJoin功能聚合类
LeftOuterJoinTest测试类
JoinKeyWritable映射阶段输出自定义类
源码篇 文件生成类

模拟生成user_id和transaction_id两个文件,方便模拟

import lombok.Data;
import org.junit.Test;

import java.io.*;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;


public class LeftOuterJoinGenerate {
    @Data
    public static class UserInfo {
        
        private String userId;
        
        private String locationId;

        public static List getLocationList() {
            return Stream.of("UT", "GA", "CA").collect(Collectors.toList());
        }

        @Override
        public String toString() {
            return String.join(",", userId, locationId);
        }
    }

    @Data
    public static class TransactionInfo {
        
        private String transactionId;
        
        private String productId;
        
        private String userId;
        
        private int quantity;
        
        private int amount;

        @Override
        public String toString() {
            return String.format("%s,%s,%s,%s,%s", transactionId, productId, userId, quantity, amount);
        }
    }

    @Test
    public void generateUserInfoList() throws IOException {
        File userInfoFile = new File("D:\Destop\hadooplearn\src\data\user_info");
        FileOutputStream fileOutputStream = new FileOutputStream(userInfoFile);
        System.out.println(userInfoFile.getAbsolutePath());
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
        int userNum = 10000;
        for (int i = 0; i < userNum; i++) {
            UserInfo userInfo = new UserInfo();
            userInfo.setUserId(String.format("u%s", (int) (Math.random() * userNum)));
            List locationList = UserInfo.getLocationList();
            userInfo.setLocationId(locationList.get((int) (Math.random() * locationList.size())));
            bufferedOutputStream.write(String.format("%sn", userInfo.toString()).getBytes());
        }
        bufferedOutputStream.close();
        fileOutputStream.close();
    }

    @Test
    public void generateTransaction() throws IOException {
        File userInfoFile = new File("D:\Destop\hadooplearn\src\data\transaction_info");
        FileOutputStream fileOutputStream = new FileOutputStream(userInfoFile);
        System.out.println(userInfoFile.getAbsolutePath());
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
        int transactionNum = 10000;
        int userNum = 10000;
        int productNum = 10000;
        for (int i = 0; i < transactionNum; i++) {
            TransactionInfo transactionInfo = new TransactionInfo();
            transactionInfo.setTransactionId(String.format("t%s", (int) (Math.random() * transactionNum)));
            transactionInfo.setProductId(String.format("p%s", (int) (Math.random() * productNum)));
            transactionInfo.setUserId(String.format("u%s", (int) (Math.random() * userNum)));
            transactionInfo.setQuantity((int) (Math.random() * 10));
            transactionInfo.setAmount((int) (Math.random() * 400));
            bufferedOutputStream.write(String.format("%sn", transactionInfo.toString()).getBytes());
        }
        bufferedOutputStream.close();
        fileOutputStream.close();
    }
}
用户信息映射器类(内部类)
    
    public static class UserMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputString = value.toString();
            String[] split = inputString.split(",");
            String userId = split[0];
            String location = split[1];
            JoinKeyWritable outputKey = new JoinKeyWritable();
            outputKey.setName(userId);
            outputKey.setValue("1");
            JoinKeyWritable outputValue = new JoinKeyWritable();
            outputValue.setName("location");
            outputValue.setValue(location);
            context.write(outputKey, outputValue);
        }
    }

交易信息映射器类(内部类)
public static class TransactionMapper extends Mapper {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String inputString = value.toString();
        String[] split = inputString.split(",");
        String product = split[1];
        String userId = split[2];
        JoinKeyWritable outputKey = new JoinKeyWritable();
        outputKey.setName(userId);
        outputKey.setValue("2");
        JoinKeyWritable outputValue = new JoinKeyWritable();
        outputValue.setName("product");
        outputValue.setValue(product);
        context.write(outputKey, outputValue);
    }
}
用户信息和交易信息规约连接类(内部类)
public static class ProductLocationReducer extends Reducer {
    @Override
    protected void reduce(JoinKeyWritable key, Iterable values, Context context) throws IOException, InterruptedException {
        ArrayList locationList = new ArrayList<>();
        boolean hasProduct = false;
        for (JoinKeyWritable keyWritable : values) {
            String name = keyWritable.getName();
            if ("location".equals(name)) {
                locationList.add(keyWritable.getValue());
            } else {
                hasProduct = true;
                for (String aLocationList : locationList) {
                    context.write(new Text(aLocationList), new Text(keyWritable.getValue()));
                }
            }
        }
        // 如果没有商品,那么左外连接为空
        if (!hasProduct) {
            for (String aLocationList : locationList) {
                context.write(new Text(aLocationList), new Text("null"));
            }
        }
    }
}
左连接MapReduce流程驱动方法
public void LeftJoinDriver(String userPath, String transactionPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "LeftJoinDriver");
    Path userFilePath = new Path(userPath);
    Path transactionFilePath = new Path(transactionPath);
    MultipleInputs.addInputPath(job, userFilePath, TextInputFormat.class, UserMapper.class);
    MultipleInputs.addInputPath(job, transactionFilePath, TextInputFormat.class, TransactionMapper.class);
    job.setMapOutputKeyClass(JoinKeyWritable.class);
    job.setMapOutputValueClass(JoinKeyWritable.class);
    job.setReducerClass(ProductLocationReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));
    job.waitForCompletion(true);
}
中间输出键类
import lombok.Data;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


@Data
public class JoinKeyWritable implements WritableComparable {
    private String name;
    private String value;

    @Override
    public int compareTo(JoinKeyWritable o) {
        return this.name.compareTo(o.name);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeUTF(value);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.name = in.readUTF();
        this.value = in.readUTF();
    }
}
功能聚合类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import writable.JoinKeyWritable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;


public class LeftOuterJoin {
    
    public static class UserMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputString = value.toString();
            String[] split = inputString.split(",");
            String userId = split[0];
            String location = split[1];
            JoinKeyWritable outputKey = new JoinKeyWritable();
            outputKey.setName(userId);
            outputKey.setValue("1");
            JoinKeyWritable outputValue = new JoinKeyWritable();
            outputValue.setName("location");
            outputValue.setValue(location);
            context.write(outputKey, outputValue);
        }
    }

    
    public static class TransactionMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputString = value.toString();
            String[] split = inputString.split(",");
            String product = split[1];
            String userId = split[2];
            JoinKeyWritable outputKey = new JoinKeyWritable();
            outputKey.setName(userId);
            outputKey.setValue("2");
            JoinKeyWritable outputValue = new JoinKeyWritable();
            outputValue.setName("product");
            outputValue.setValue(product);
            context.write(outputKey, outputValue);
        }
    }

    
    public static class ProductLocationReducer extends Reducer {
        @Override
        protected void reduce(JoinKeyWritable key, Iterable values, Context context) throws IOException, InterruptedException {
            ArrayList locationList = new ArrayList<>();
            boolean hasProduct = false;
            for (JoinKeyWritable keyWritable : values) {
                String name = keyWritable.getName();
                if ("location".equals(name)) {
                    locationList.add(keyWritable.getValue());
                } else {
                    hasProduct = true;
                    for (String aLocationList : locationList) {
                        context.write(new Text(aLocationList), new Text(keyWritable.getValue()));
                    }
                }
            }
            // 如果没有商品,那么左外连接为空
            if (!hasProduct) {
                for (String aLocationList : locationList) {
                    context.write(new Text(aLocationList), new Text("null"));
                }
            }
        }
    }

    
    public static class LocationCountMapper extends Mapper {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String inputString = value.toString();
            String[] split = inputString.split("t");
            String location = split[0];
            String product = split[1];
            context.write(new Text(product), new Text(location));
        }
    }

    
    public static class LocationCountReducer extends Reducer {
        @Override
        protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            HashSet locationSet = new HashSet<>();
            for (Text location : values) {
                locationSet.add(location);
            }
            context.write(key, new IntWritable(locationSet.size()));
        }
    }

    
    public void LeftJoinDriver(String userPath, String transactionPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "LeftJoinDriver");
        Path userFilePath = new Path(userPath);
        Path transactionFilePath = new Path(transactionPath);
        MultipleInputs.addInputPath(job, userFilePath, TextInputFormat.class, UserMapper.class);
        MultipleInputs.addInputPath(job, transactionFilePath, TextInputFormat.class, TransactionMapper.class);
        job.setMapOutputKeyClass(JoinKeyWritable.class);
        job.setMapOutputValueClass(JoinKeyWritable.class);
        job.setReducerClass(ProductLocationReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.waitForCompletion(true);
    }

    public void LocationCountDriver(String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "LocationCountDriver");
        job.setMapperClass(LocationCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setReducerClass(LocationCountReducer.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.waitForCompletion(true);
    }
}
测试启动类(注意hdfs的命名空间等环境区别)
import org.junit.Test;

import java.io.IOException;


public class LeftOuterJoinTest {
    @Test
    public void testLeftOuterJoin() throws InterruptedException, IOException, ClassNotFoundException {
        LeftOuterJoin leftOuterJoin = new LeftOuterJoin();
        leftOuterJoin.LeftJoinDriver("D:\Destop\hadooplearn\src\data\user_info",
                "D:\Destop\hadooplearn\src\data\transaction_info",
                "hdfs://spark01:9000/test/leftOuterJoinResult");
    }

    @Test
    public void testLocationCountDriver() throws InterruptedException, IOException, ClassNotFoundException {
        LeftOuterJoin leftOuterJoin = new LeftOuterJoin();
        leftOuterJoin.LocationCountDriver("hdfs://spark01:9000/test/leftOuterJoinResult",
                "hdfs://spark01:9000/test/locationCountResult");
    }
}

运行截图 用户信息数据

交易信息数据

运行成功数据

总结

总体来说,左连接的实现还是特别复杂的。而且本次实现在Spark的Join策略中属于时间复杂度最高的NPJ方式(双重for循环实现)。

如果有厉害的XD可以挑战一下MapReduce版本的**SMJ(排序双指针)和HJ(哈希)**方式。

谢谢各位,奈何拙笔水平有限,有问题望各位指点迷津。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/618684.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号