- MapReduce 分布式缓存
- 前言
- 一、Map Side join
- 二、分布式缓存
- 1.概念
- 2.代码实现
- 3.驱动类代码
- 4.在Yarn上运行代码
前言
提示:以下是本篇文章正文内容,下面案例可供参考
一、Map Side joinmap Side Join 就是在 map阶段执行join关联操作,并且程序也没有了reduce阶段。避免了 shuffle时候的繁琐。 实现的的关键是使用MapReduce的分布式缓存。
二、分布式缓存 1.概念分布式缓存的使用必须使用MapReduce的yarn模式运行。
join要处理的数据集,使用分布式缓存将小数据或者基本固定不变的数据或者文档进行分布式缓存。
在mapReduce的框架下,会自动将缓存的数据分发到各个maptast。
程序只会在mapper阶段将 缓存的文件或数据集读取出来,然后和自己读取的数据进行join关联 最后输出结果。
代码如下(示例):
1.重写Mapper的初始化方法 setup(),读取缓存文件。
2.缓存文件是以流的形式进行读取。
3.将读取的数据存储在 集合当中。
4.将读取的数据与自己读取的文件进行join合并操作。
5.直接输出。
public class MapJoinMapper extends Mapper3.驱动类代码{ Text keyOut = new Text(); Map map = new HashMap<>(); @Override protected void setup(Mapper .Context context) throws IOException, InterruptedException { //读取文件 BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt"))); String line = null; while ((line=br.readLine()) != null){ //一行数据格式为: 100101|155083444927602|四川果冻橙6个约180g(商品id,商品编号,商品名称 String[] split = line.split("\|"); map.put(split[0],split[1]+"t"+split[2]); } } @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { String[] split = value.toString().split("\|"); String s = map.get(split[1]); keyOut.set(value.toString()+"t"+s); context.write(keyOut,NullWritable.get()); } }
其它步骤基本不变。
添加归档文件到分布式缓存中 job.addCacheArchive(URI uri);
添加普通文件到分布式缓存中 job.addCacheFile(URI uri);
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
// 创建作业实例
Job job = Job.getInstance(conf, MapJoinDriver.class.getSimpleName());
// 设置作业驱动类
job.setJarByClass(MapJoinDriver.class);
// 设置作业mapper
job.setMapperClass(MapJoinMapper.class);
// 设置作业mapper阶段输出key value数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//设置作业最终输出数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// todo 添加分布式缓存文件
job.addCacheFile(new URI("/data/join/cache/itheima_goods.txt"));
// 不需要reduce,那么也就没有了shuffle过程
job.setNumReduceTasks(0);
// 配置作业的输入数据路径
FileInputFormat.addInputPath(job, new Path(args[0]));
// 配置作业的输出数据路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交作业并等待执行完成
boolean b = job.waitForCompletion(true);
// System.exit(b ? 0 :1);
}
}
4.在Yarn上运行代码
将代码打包 ,放到服务上。
运行jar包
hadoop jar (jar包名称)输入的文件路径 输出的文件路径
hadoop jar example-hdfs.jar /data/join/input /data/join/output



