首先准备两个CVS表单
我第一个表单为customer
第二个表单为order
接着实现2表连接查询
首先写一个实体类CustomerOrder。定义变量
package cn.kgc.kb15.demo05; import org.apache.hadoop.io.WritableComparable; import javax.xml.crypto.Data; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; //这里实现WritableComparable接口,并且重载方法 public class CustomerOrder implements WritableComparable{ private Integer customerId; private String customerName; private Integer orderId; private String orderStatus; private String flag;//订单日期作为区别是订单表还是顾客表 //下面实现get,set方法。tostring方法,构造方法 @Override public String toString() { return "CustomerOrder{" + "customerId=" + customerId + ", customerName='" + customerName + ''' + ", orderId=" + orderId + ", orderStatus='" + orderStatus + ''' + ", flag='" + flag + ''' + '}'; } public Integer getCustomerId() { return customerId; } public void setCustomerId(Integer customerId) { this.customerId = customerId; } public String getCustomerName() { return customerName; } public void setCustomerName(String customerName) { this.customerName = customerName; } public Integer getOrderId() { return orderId; } public void setOrderId(Integer orderId) { this.orderId = orderId; } public String getOrderStatus() { return orderStatus; } public void setOrderStatus(String orderStatus) { this.orderStatus = orderStatus; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } public CustomerOrder() { } public CustomerOrder(Integer customerId, String customerName, Integer orderId, String orderStatus, String flag) { this.customerId = customerId; this.customerName = customerName; this.orderId = orderId; this.orderStatus = orderStatus; this.flag = flag; } //这里为了之后进行排名用 @Override public int compareTo(CustomerOrder o) { return 0; } //这里定义几个变量写几个,尽量顺序和开头写的一致 @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(customerId); dataOutput.writeUTF(customerName); dataOutput.writeInt(orderId); dataOutput.writeUTF(orderStatus); dataOutput.writeUTF(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.customerId=dataInput.readInt(); this.customerName=dataInput.readUTF(); this.orderId=dataInput.readInt(); this.orderStatus=dataInput.readUTF(); this.flag=dataInput.readUTF(); } }
接着写Mapper类,编写JoinMapper类,重载两个方法
package cn.kgc.kb15.demo05; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import java.io.IOException; //Map类继承Mapper,输出Text和对象,重载Mapper两个方法 public class JoinMapper extends Mapper{ String name=""; CustomerOrder customerOrder=new CustomerOrder(); @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit=(FileSplit)context.getInputSplit();//这边获取Driver类给的路径 System.out.println(inputSplit.getPath().toString()); name=inputSplit.getPath().getName();//这里得到当前路径代表的表单。比如获取customer } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields=value.toString().split(",");//value值就是当前表单里面所有的数据,根据逗号进行切割 System.out.println("Mapper: "+value.toString()); if (name.startsWith("order")){//订单表,里面不包含用户信息,1,2013/7/25 0:00,11599,CLOSED customerOrder.setCustomerId(Integer.parseInt(fields[2]));//对应切割后每一个对应的位置 customerOrder.setCustomerName(""); customerOrder.setOrderId(Integer.parseInt(fields[0])); customerOrder.setOrderStatus(fields[3]); customerOrder.setFlag(fields[1]); }else { //顾客表,里面不包含订单信息,1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521 customerOrder.setCustomerId(Integer.parseInt(fields[0])); customerOrder.setCustomerName(fields[1]); customerOrder.setOrderId(-1); customerOrder.setOrderStatus(""); customerOrder.setFlag("0"); } //输出的key值就为cutomer的id Text text=new Text(customerOrder.getCustomerId().toString()); context.write(text,customerOrder); } }
接着编写Reducer类,写个JoinReducer类
package cn.kgc.kb15.demo05; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; public class JoinReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { CustomerOrder customer=new CustomerOrder(); List orders=new ArrayList<>(); for (CustomerOrder customerOrder : values) { if (customerOrder.getFlag().equals("0")){//代表顾客表 只有一个 try { BeanUtils.copyProperties(customer,customerOrder);//该语法就是将当前顾客表所有的信息复制给customer } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } ; }else {//否则就是订单表,其他都是订单 CustomerOrder order=new CustomerOrder(); try { BeanUtils.copyProperties(order,customerOrder);//将订单表所有的数据复制给order,并放入集合里 orders.add(order); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } for (CustomerOrder order : orders) { order.setCustomerName(customer.getCustomerName()); context.write(order,NullWritable.get()); } } }
最后是Driver类,编写JoinDriver
package cn.kgc.kb15.demo05;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class JoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(JoinDriver.class);
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CustomerOrder.class);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(CustomerOrder.class);
job.setOutputValueClass(NullWritable.class);
job.setPartitionerClass(JoinPartitioner.class);
job.setNumReduceTasks(3);
FileInputFormat.setInputPaths(job,new Path("D:\Java\Code\hadoopstu\in\demo4"));
Path pathOut=new Path("D:\Java\Code\hadoopstu\out6");
FileSystem fileSystem=FileSystem.get(pathOut.toUri(),conf);
if (fileSystem.exists(pathOut)){
fileSystem.delete(pathOut,true);
}
FileOutputFormat.setOutputPath(job,pathOut);
job.waitForCompletion(true);
}
}



