customers.csv格式
1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521
2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126
3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,725
......
orders.csv格式
1,2013/7/25 0:00,11599,CLOSED
2,2013/7/25 0:00,256,PENDING_PAYMENT
3,2013/7/25 0:00,12111,COMPLETE
......
CustomerOrder.java
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CustomerOrder implements WritableComparable {
private Integer customerId; // 客户编号
private String customerName; // 客户姓名
private Integer orderId; // 订单编号
private String orderStatus; // 订单状态
private String flag; // 用订单日期作为区别是customer.csv,还是order.csv
@Override
public String toString() {
return "CustomerOrder{" +
"customerId=" + customerId +
", customerName='" + customerName + ''' +
", orderId=" + orderId +
", orderStatus='" + orderStatus + ''' +
", flag='" + flag + ''' +
'}';
}
public CustomerOrder(Integer customerId, String customerName, Integer orderId, String orderStatus, String flag) {
this.customerId = customerId;
this.customerName = customerName;
this.orderId = orderId;
this.orderStatus = orderStatus;
this.flag = flag;
}
public String getFlag() {
return flag;
}
public void setFlag(String flag) {
this.flag = flag;
}
public Integer getCustomerId() {
return customerId;
}
public void setCustomerId(Integer customerId) {
this.customerId = customerId;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public String getOrderStatus() {
return orderStatus;
}
public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
public CustomerOrder() {
}
@Override
public int compareTo(CustomerOrder o) {
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(customerId);
dataOutput.writeUTF(customerName);
dataOutput.writeInt(orderId);
dataOutput.writeUTF(orderStatus);
dataOutput.writeUTF(flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.customerId = dataInput.readInt();
this.customerName = dataInput.readUTF();
this.orderId = dataInput.readInt();
this.orderStatus = dataInput.readUTF();
this.flag = dataInput.readUTF();
}
}
JoinMapper.java
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class JoinMapper extends Mapper {
String name="";
CustomerOrder customerOrder= new CustomerOrder();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
System.out.println(inputSplit.getPath().toString());
name = inputSplit.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
System.out.println("JoinMapper "+value.toString());
if (name.startsWith("order")) {// 订单表 订单表不包含用户信息
customerOrder.setCustomerId(Integer.parseInt(fields[2]));
customerOrder.setCustomerName("");
customerOrder.setOrderId(Integer.parseInt(fields[0]));
customerOrder.setOrderStatus(fields[3]);
customerOrder.setFlag(fields[1]);
}else {// 顾客表 顾客表中不包含订单信息
customerOrder.setCustomerId(Integer.parseInt(fields[0]));
customerOrder.setCustomerName(fields[1]);
customerOrder.setOrderId(-1);
customerOrder.setOrderStatus("");
customerOrder.setFlag("0");
}
Text text=new Text(customerOrder.getCustomerId().toString());
context.write(text,customerOrder);
}
}
JoinPartitioner.java
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class JoinPartitioner extends Partitioner {
@Override
public int getPartition(Text text, CustomerOrder customerOrder, int i) {
Integer customerId = Integer.parseInt(text.toString());
if (customerId%2==0) {
return 0;
}else {
return 1;
}
}
}
JoinReducer.java
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
public class JoinReducer extends Reducer {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
}
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
CustomerOrder customer = new CustomerOrder();
List orders = new ArrayList<>();
for (CustomerOrder customerOrder:
values) {
if (customerOrder.getFlag().equals("0")) { // 顾客 只有一个
try {
BeanUtils.copyProperties(customer,customerOrder);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}else { // 订单 其他都是订单
CustomerOrder order = new CustomerOrder();
try {
BeanUtils.copyProperties(order,customerOrder);
orders.add(order);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
for (CustomerOrder order:
orders) {
order.setCustomerName(customer.getCustomerName());
context.write(order,NullWritable.get());
}
}
}
JoinDriver.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class JoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration=new Configuration();
Job job=Job.getInstance(configuration);
job.setJarByClass(JoinDriver.class);
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CustomerOrder.class);
job.setPartitionerClass(JoinPartitioner.class);
job.setNumReduceTasks(2);
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(CustomerOrder.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path("G:\kgc\KB15\code\hadoopstu\in\demo8"));
Path path = new Path("G:\kgc\KB15\code\hadoopstu\out8");
FileSystem fs=FileSystem.get(path.toUri(),configuration);
if (fs.exists(path)) {
fs.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
job.waitForCompletion(true);
}
}
运行结果
CustomerOrder{customerId=10, customerName='Melissa', orderId=56133, orderStatus='COMPLETE', flag='2014/7/15 0:00'}
CustomerOrder{customerId=10, customerName='Melissa', orderId=45239, orderStatus='COMPLETE', flag='2014/5/1 0:00'}
CustomerOrder{customerId=100, customerName='George', orderId=64426, orderStatus='PENDING', flag='2014/4/6 0:00'}
CustomerOrder{customerId=100, customerName='George', orderId=54995, orderStatus='COMPLETE', flag='2014/7/8 0:00'}
CustomerOrder{customerId=100, customerName='George', orderId=15045, orderStatus='PROCESSING', flag='2013/10/28 0:00'}
......