栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

MapReduce编程Join的多种应用之通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

MapReduce编程Join的多种应用之通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联

customers.csv格式
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,725
......
orders.csv格式
1,2013/7/25 0:00,11599,CLOSED
2,2013/7/25 0:00,256,PENDING_PAYMENT
3,2013/7/25 0:00,12111,COMPLETE
......
CustomerOrder.java
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CustomerOrder implements WritableComparable {

    private Integer customerId; // 客户编号
    private String customerName; // 客户姓名
    private Integer orderId; // 订单编号
    private String orderStatus;  // 订单状态
    private String flag; // 用订单日期作为区别是customer.csv,还是order.csv

    @Override
    public String toString() {
        return "CustomerOrder{" +
                "customerId=" + customerId +
                ", customerName='" + customerName + ''' +
                ", orderId=" + orderId +
                ", orderStatus='" + orderStatus + ''' +
                ", flag='" + flag + ''' +
                '}';
    }

    public CustomerOrder(Integer customerId, String customerName, Integer orderId, String orderStatus, String flag) {
        this.customerId = customerId;
        this.customerName = customerName;
        this.orderId = orderId;
        this.orderStatus = orderStatus;
        this.flag = flag;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    public Integer getCustomerId() {
        return customerId;
    }

    public void setCustomerId(Integer customerId) {
        this.customerId = customerId;
    }

    public String getCustomerName() {
        return customerName;
    }

    public void setCustomerName(String customerName) {
        this.customerName = customerName;
    }

    public Integer getOrderId() {
        return orderId;
    }

    public void setOrderId(Integer orderId) {
        this.orderId = orderId;
    }

    public String getOrderStatus() {
        return orderStatus;
    }

    public void setOrderStatus(String orderStatus) {
        this.orderStatus = orderStatus;
    }

    public CustomerOrder() {
    }

    @Override
    public int compareTo(CustomerOrder o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(customerId);
        dataOutput.writeUTF(customerName);
        dataOutput.writeInt(orderId);
        dataOutput.writeUTF(orderStatus);
        dataOutput.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.customerId = dataInput.readInt();
        this.customerName = dataInput.readUTF();
        this.orderId = dataInput.readInt();
        this.orderStatus = dataInput.readUTF();
        this.flag = dataInput.readUTF();
    }
}
JoinMapper.java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class JoinMapper extends Mapper {

    String name="";
    CustomerOrder customerOrder= new CustomerOrder();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        System.out.println(inputSplit.getPath().toString());
        name = inputSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        System.out.println("JoinMapper  "+value.toString());
        if (name.startsWith("order")) {// 订单表 订单表不包含用户信息
            customerOrder.setCustomerId(Integer.parseInt(fields[2]));
            customerOrder.setCustomerName("");
            customerOrder.setOrderId(Integer.parseInt(fields[0]));
            customerOrder.setOrderStatus(fields[3]);
            customerOrder.setFlag(fields[1]);
        }else {// 顾客表 顾客表中不包含订单信息
            customerOrder.setCustomerId(Integer.parseInt(fields[0]));
            customerOrder.setCustomerName(fields[1]);
            customerOrder.setOrderId(-1);
            customerOrder.setOrderStatus("");
            customerOrder.setFlag("0");
        }
        Text text=new Text(customerOrder.getCustomerId().toString());

        context.write(text,customerOrder);
    }
}
JoinPartitioner.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class JoinPartitioner extends Partitioner {
    @Override
    public int getPartition(Text text, CustomerOrder customerOrder, int i) {
        Integer customerId = Integer.parseInt(text.toString());
        if (customerId%2==0) {
            return 0;
        }else {
            return 1;
        }
    }
}
JoinReducer.java
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;

public class JoinReducer extends Reducer {
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
    }

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        CustomerOrder customer = new CustomerOrder();
        List orders = new ArrayList<>();
        for (CustomerOrder customerOrder:
             values) {
            if (customerOrder.getFlag().equals("0")) { // 顾客    只有一个
                try {
                    BeanUtils.copyProperties(customer,customerOrder);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }else { // 订单   其他都是订单
                CustomerOrder order = new CustomerOrder();
                try {
                    BeanUtils.copyProperties(order,customerOrder);
                    orders.add(order);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        for (CustomerOrder order:
             orders) {
            order.setCustomerName(customer.getCustomerName());
            context.write(order,NullWritable.get());
        }
    }
}
JoinDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class JoinDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration=new Configuration();
        Job job=Job.getInstance(configuration);

        job.setJarByClass(JoinDriver.class);

        job.setMapperClass(JoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CustomerOrder.class);

        job.setPartitionerClass(JoinPartitioner.class);
        job.setNumReduceTasks(2);

        job.setReducerClass(JoinReducer.class);
        job.setOutputKeyClass(CustomerOrder.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("G:\kgc\KB15\code\hadoopstu\in\demo8"));

        Path path = new Path("G:\kgc\KB15\code\hadoopstu\out8");
        FileSystem fs=FileSystem.get(path.toUri(),configuration);
        if (fs.exists(path)) {
            fs.delete(path,true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.waitForCompletion(true);
    }
}
运行结果

CustomerOrder{customerId=10, customerName='Melissa', orderId=56133, orderStatus='COMPLETE', flag='2014/7/15 0:00'}
CustomerOrder{customerId=10, customerName='Melissa', orderId=45239, orderStatus='COMPLETE', flag='2014/5/1 0:00'}
CustomerOrder{customerId=100, customerName='George', orderId=64426, orderStatus='PENDING', flag='2014/4/6 0:00'}
CustomerOrder{customerId=100, customerName='George', orderId=54995, orderStatus='COMPLETE', flag='2014/7/8 0:00'}
CustomerOrder{customerId=100, customerName='George', orderId=15045, orderStatus='PROCESSING', flag='2013/10/28 0:00'}
......
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/435122.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号