首先在Map阶段初始化时读取替换表的内容存入内存中,然后在Map阶段map方法时将其进行替换。所以只需要Map阶段即可,不需要Reduce阶段。
输入数据order.txt 订单表数据(间隔:t)
订单id 商品id 数量
1001 01 1 1002 02 2 1003 03 3 1004 01 4 1005 02 5 1006 03 6
pd.txt 商品表数据(间隔:t)
商品id 商品名字
01 小米 02 华为 03 红米Maven和log4j.properties配置
参考 MapReduce统计流量案例 中的配置
自定义Mapper类实现(MapJoinMapper)package com.test.mapreduce.mapjoin; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.HashMap; import java.util.Map; public class MapJoinMapper extends Mapper自定义Driver类实现(MapJoinDriver){ private Map pdMap = new HashMap<>(); private Text text = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 通过缓存文件得到小表数据pd.txt URI[] cacheFiles = context.getCacheFiles(); Path path = new Path(cacheFiles[0]); // 获取文件系统对象 FileSystem fs = FileSystem.get(context.getConfiguration()); // 打开流 FSDataInputStream fis = fs.open(path); // 通过包装流转换为reader,方便按行读取 BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8")); // 逐行读取,按行来处理 String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { // 字符串切割(数据:01 小米) String[] split = line.split("t"); // 添加到HashMap中 pdMap.put(split[0], split[1]); } // 关闭流 IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1.获取每一行 String line = value.toString(); // 2.字符串切割 String[] split = line.split("t"); // 3.获取pname String pname = pdMap.get(split[1]); // 4.封装并拼接 text.set(split[0] + "t" + pname + "t" + split[2]); // 5.写出 context.write(text, NullWritable.get()); } }
package com.test.mapreduce.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
// 1.获取job信息
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2.设置加载jar包路径
job.setJarByClass(MapJoinDriver.class);
// 3.关联mapper
job.setMapperClass(MapJoinMapper.class);
// 4.设置Map输出KV类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
// 5.设置最终输出KV类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 6.加载缓存数据
job.addCacheFile(new URI("file:///D:/cache/pd.txt"));
// 6.Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
// 8.设置输入输出路径
FileInputFormat.setInputPaths(job, new Path("D:\input"));
FileOutputFormat.setOutputPath(job, new Path("D:\output"));
// 9.提交
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
数据输出
1001 小米 1 1002 华为 2 1003 红米 3 1004 小米 4 1005 华为 5 1006 红米 6



