数据算法-Hadoop/Spark大数据处理技巧
作者Mahmoud Parsian著;
苏金国 杨健康两位老师翻译的一本书
序号书中96页的左外连接介绍
原理篇两张表T1和T2,T1与T2在连接键Key的作用下,相交的部分包含两张表的完整属性,不相交的部分,包含T1的所有属性,T2的属性则全为Null值。
此处左表为用户表
数据源 用户表(用户Id,交易地址)u9242,CA u4671,UT u4195,UT u3836,GA u7507,CA u3177,UT u6598,CA u2283,CA u4815,CA u7403,GA u5967,GA u3028,CA u6277,CA u6791,CA交易表(交易id,商品id,用户id,商品数量,单价)
t8414,p5885,u5830,4,147 t5213,p9339,u5670,3,274 t7187,p8204,u8341,8,202 t1039,p1637,u8589,7,52 t861,p3175,u2609,0,342 t5492,p8767,u8417,2,330 t4467,p3445,u1200,0,253 t89,p7055,u8402,2,61 t6323,p9525,u2155,8,33 t3655,p9556,u3555,8,280 t6377,p9949,u7538,1,209 t766,p2543,u7881,9,14 t5667,p5268,u8852,7,286关键实现点
MapReduce的规约器是根据Key的compareTo方法进行排序和合并操作**(等于0时)**
故本次实现定义了JoinKeyWritable来替换书中的PairOfString**(主要是找不到)**
这次Join时的key为userId,故JoinKey的compareTo方法应该在userId相等时传递到同一个规约器的value迭代器中(Iterable values)
左外连接目标用户表左外连接交易表
得到所有用户交易的商品以及用户的地址
实现类| 类名 | 类描述 |
|---|---|
| UserMapper | 用户信息映射类 |
| TransactionMapper | 交易信息映射类 |
| ProductLocationReducer | 用户信息和交易信息规约类 |
| LeftOuterJoin | 功能聚合类 |
| LeftOuterJoinTest | 测试类 |
| JoinKeyWritable | 映射阶段输出自定义类 |
模拟生成user_id和transaction_id两个文件,方便模拟
import lombok.Data;
import org.junit.Test;
import java.io.*;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class LeftOuterJoinGenerate {
@Data
public static class UserInfo {
private String userId;
private String locationId;
public static List getLocationList() {
return Stream.of("UT", "GA", "CA").collect(Collectors.toList());
}
@Override
public String toString() {
return String.join(",", userId, locationId);
}
}
@Data
public static class TransactionInfo {
private String transactionId;
private String productId;
private String userId;
private int quantity;
private int amount;
@Override
public String toString() {
return String.format("%s,%s,%s,%s,%s", transactionId, productId, userId, quantity, amount);
}
}
@Test
public void generateUserInfoList() throws IOException {
File userInfoFile = new File("D:\Destop\hadooplearn\src\data\user_info");
FileOutputStream fileOutputStream = new FileOutputStream(userInfoFile);
System.out.println(userInfoFile.getAbsolutePath());
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
int userNum = 10000;
for (int i = 0; i < userNum; i++) {
UserInfo userInfo = new UserInfo();
userInfo.setUserId(String.format("u%s", (int) (Math.random() * userNum)));
List locationList = UserInfo.getLocationList();
userInfo.setLocationId(locationList.get((int) (Math.random() * locationList.size())));
bufferedOutputStream.write(String.format("%sn", userInfo.toString()).getBytes());
}
bufferedOutputStream.close();
fileOutputStream.close();
}
@Test
public void generateTransaction() throws IOException {
File userInfoFile = new File("D:\Destop\hadooplearn\src\data\transaction_info");
FileOutputStream fileOutputStream = new FileOutputStream(userInfoFile);
System.out.println(userInfoFile.getAbsolutePath());
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(fileOutputStream);
int transactionNum = 10000;
int userNum = 10000;
int productNum = 10000;
for (int i = 0; i < transactionNum; i++) {
TransactionInfo transactionInfo = new TransactionInfo();
transactionInfo.setTransactionId(String.format("t%s", (int) (Math.random() * transactionNum)));
transactionInfo.setProductId(String.format("p%s", (int) (Math.random() * productNum)));
transactionInfo.setUserId(String.format("u%s", (int) (Math.random() * userNum)));
transactionInfo.setQuantity((int) (Math.random() * 10));
transactionInfo.setAmount((int) (Math.random() * 400));
bufferedOutputStream.write(String.format("%sn", transactionInfo.toString()).getBytes());
}
bufferedOutputStream.close();
fileOutputStream.close();
}
}
用户信息映射器类(内部类)
public static class UserMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split(",");
String userId = split[0];
String location = split[1];
JoinKeyWritable outputKey = new JoinKeyWritable();
outputKey.setName(userId);
outputKey.setValue("1");
JoinKeyWritable outputValue = new JoinKeyWritable();
outputValue.setName("location");
outputValue.setValue(location);
context.write(outputKey, outputValue);
}
}
交易信息映射器类(内部类)
public static class TransactionMapper extends Mapper用户信息和交易信息规约连接类(内部类){ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String inputString = value.toString(); String[] split = inputString.split(","); String product = split[1]; String userId = split[2]; JoinKeyWritable outputKey = new JoinKeyWritable(); outputKey.setName(userId); outputKey.setValue("2"); JoinKeyWritable outputValue = new JoinKeyWritable(); outputValue.setName("product"); outputValue.setValue(product); context.write(outputKey, outputValue); } }
public static class ProductLocationReducer extends Reducer左连接MapReduce流程驱动方法{ @Override protected void reduce(JoinKeyWritable key, Iterable values, Context context) throws IOException, InterruptedException { ArrayList locationList = new ArrayList<>(); boolean hasProduct = false; for (JoinKeyWritable keyWritable : values) { String name = keyWritable.getName(); if ("location".equals(name)) { locationList.add(keyWritable.getValue()); } else { hasProduct = true; for (String aLocationList : locationList) { context.write(new Text(aLocationList), new Text(keyWritable.getValue())); } } } // 如果没有商品,那么左外连接为空 if (!hasProduct) { for (String aLocationList : locationList) { context.write(new Text(aLocationList), new Text("null")); } } } }
public void LeftJoinDriver(String userPath, String transactionPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "LeftJoinDriver");
Path userFilePath = new Path(userPath);
Path transactionFilePath = new Path(transactionPath);
MultipleInputs.addInputPath(job, userFilePath, TextInputFormat.class, UserMapper.class);
MultipleInputs.addInputPath(job, transactionFilePath, TextInputFormat.class, TransactionMapper.class);
job.setMapOutputKeyClass(JoinKeyWritable.class);
job.setMapOutputValueClass(JoinKeyWritable.class);
job.setReducerClass(ProductLocationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
中间输出键类
import lombok.Data; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @Data public class JoinKeyWritable implements WritableComparable功能聚合类{ private String name; private String value; @Override public int compareTo(JoinKeyWritable o) { return this.name.compareTo(o.name); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(name); out.writeUTF(value); } @Override public void readFields(DataInput in) throws IOException { this.name = in.readUTF(); this.value = in.readUTF(); } }
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import writable.JoinKeyWritable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
public class LeftOuterJoin {
public static class UserMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split(",");
String userId = split[0];
String location = split[1];
JoinKeyWritable outputKey = new JoinKeyWritable();
outputKey.setName(userId);
outputKey.setValue("1");
JoinKeyWritable outputValue = new JoinKeyWritable();
outputValue.setName("location");
outputValue.setValue(location);
context.write(outputKey, outputValue);
}
}
public static class TransactionMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split(",");
String product = split[1];
String userId = split[2];
JoinKeyWritable outputKey = new JoinKeyWritable();
outputKey.setName(userId);
outputKey.setValue("2");
JoinKeyWritable outputValue = new JoinKeyWritable();
outputValue.setName("product");
outputValue.setValue(product);
context.write(outputKey, outputValue);
}
}
public static class ProductLocationReducer extends Reducer {
@Override
protected void reduce(JoinKeyWritable key, Iterable values, Context context) throws IOException, InterruptedException {
ArrayList locationList = new ArrayList<>();
boolean hasProduct = false;
for (JoinKeyWritable keyWritable : values) {
String name = keyWritable.getName();
if ("location".equals(name)) {
locationList.add(keyWritable.getValue());
} else {
hasProduct = true;
for (String aLocationList : locationList) {
context.write(new Text(aLocationList), new Text(keyWritable.getValue()));
}
}
}
// 如果没有商品,那么左外连接为空
if (!hasProduct) {
for (String aLocationList : locationList) {
context.write(new Text(aLocationList), new Text("null"));
}
}
}
}
public static class LocationCountMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String inputString = value.toString();
String[] split = inputString.split("t");
String location = split[0];
String product = split[1];
context.write(new Text(product), new Text(location));
}
}
public static class LocationCountReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
HashSet locationSet = new HashSet<>();
for (Text location : values) {
locationSet.add(location);
}
context.write(key, new IntWritable(locationSet.size()));
}
}
public void LeftJoinDriver(String userPath, String transactionPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "LeftJoinDriver");
Path userFilePath = new Path(userPath);
Path transactionFilePath = new Path(transactionPath);
MultipleInputs.addInputPath(job, userFilePath, TextInputFormat.class, UserMapper.class);
MultipleInputs.addInputPath(job, transactionFilePath, TextInputFormat.class, TransactionMapper.class);
job.setMapOutputKeyClass(JoinKeyWritable.class);
job.setMapOutputValueClass(JoinKeyWritable.class);
job.setReducerClass(ProductLocationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
public void LocationCountDriver(String inputPath, String outputPath) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf, "LocationCountDriver");
job.setMapperClass(LocationCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(LocationCountReducer.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.waitForCompletion(true);
}
}
测试启动类(注意hdfs的命名空间等环境区别)
import org.junit.Test;
import java.io.IOException;
public class LeftOuterJoinTest {
@Test
public void testLeftOuterJoin() throws InterruptedException, IOException, ClassNotFoundException {
LeftOuterJoin leftOuterJoin = new LeftOuterJoin();
leftOuterJoin.LeftJoinDriver("D:\Destop\hadooplearn\src\data\user_info",
"D:\Destop\hadooplearn\src\data\transaction_info",
"hdfs://spark01:9000/test/leftOuterJoinResult");
}
@Test
public void testLocationCountDriver() throws InterruptedException, IOException, ClassNotFoundException {
LeftOuterJoin leftOuterJoin = new LeftOuterJoin();
leftOuterJoin.LocationCountDriver("hdfs://spark01:9000/test/leftOuterJoinResult",
"hdfs://spark01:9000/test/locationCountResult");
}
}
运行截图
用户信息数据
交易信息数据
运行成功数据
总结
总体来说,左连接的实现还是特别复杂的。而且本次实现在Spark的Join策略中属于时间复杂度最高的NPJ方式(双重for循环实现)。
如果有厉害的XD可以挑战一下MapReduce版本的**SMJ(排序双指针)和HJ(哈希)**方式。
谢谢各位,奈何拙笔水平有限,有问题望各位指点迷津。



