[root@master ~]# mv score.txt /usr/local/soft/data [root@master ~]# cd /usr/local/soft/data [root@master data]# ls new_db.sql student.sql theZen.txt score.sql students.txt wordcount score.txt theZenOfPython.txt words.txt [root@master data]# hdfs dfs -mkdir -p /data/score/input [root@master data]# hdfs dfs -put score.txt /data/score/input/ [root@master data]# cd .. [root@master soft]# cd jars/ [root@master jars]# ls hadoop-1.0-SNAPSHOT.jar [root@master jars]# rm hadoop-1.0-SNAPSHOT.jar rm:是否删除普通文件 "hadoop-1.0-SNAPSHOT.jar"?y [root@master jars]# rz -E rz waiting to receive. [root@master jars]# hadoop jar hadoop-1.0-SNAPSHOT.jar com.shujia.MapReduce.Demo03Join
package com.shujia.MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
public class Demo03Join {
//Map端
public static class MyMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(",");
//学生的id作为Map输出的key
String id = splits[0];
String outputValue = "";
//通过splits.length判断当前处理的这条数据到底是属于students还是score的数据
if(splits.length==5){
//students学生信息数据
outputValue="#"+value.toString();
}else if(splits.length==3){
//score学生分数信息数据
outputValue="$"+value.toString();
}
context.write(new Text(id),new Text(outputValue));
}
}
//Reduce端
public static class MyReducer extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException {
//从values中区分学生信息数据以及学生分数数据
//因为Iterable values只能被遍历一次,但需要使用了两次
//已知values的数据量不是很大,所以直接将values的数据放到List
ArrayList valueList = new ArrayList<>();
for (Text value : values) {
valueList.add(value.toString());
}
//提取学生信息:姓名,班级
String name = "";
String clazz = "";
for (String valueStr : valueList) {
if (valueStr.contains("#")) {
name = valueStr.split(",")[1];
clazz = valueStr.split(",")[4];
}
}
for (String valueStr : valueList) {
if (valueStr.contains("$")) {
System.out.println("获取学生成绩数据");
String subjectId = valueStr.split(",")[1];
String subjectScore = valueStr.split(",")[2];
//Reduce Join
context.write(key, new Text(name + "," + clazz + "," + subjectId + "," + subjectScore));
}
}
}
}
//Driver端
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9000");
//指定分隔符为逗号
conf.set("mapred.textoutputformat.separator",",");
//创建一个MapReduce的job
Job job = Job.getInstance(conf);
//配置任务
job.setJobName("Demo03Join");
//设置任务运行哪个类
job.setJarByClass(Demo03Join.class);
//配置map端
//指定map运行时哪一个类
job.setMapperClass(MyMapper.class);
//配置Map端输出的key类型
job.setMapOutputKeyClass(Text.class);
//配置Map端输出的value类型
job.setMapOutputValueClass(Text.class);
//配置Reduce端
//指定Reduce运行时哪一个类
job.setReducerClass(Demo03Join.MyReducer.class);
//配置Reduce端输出的key类型
job.setOutputKeyClass(Text.class);
//配置Reduce端输出的value类型
job.setOutputValueClass(Text.class);
//配置输入输出路径
FileInputFormat.addInputPath(job,new Path("/data/stu/input"));
FileInputFormat.addInputPath(job,new Path("/data/score/input"));
Path path = new Path("/data/join/output");
FileSystem fs = FileSystem.get(conf);
//判断输出路径是否存在,存在则删除
if (fs.exists(path)){
fs.delete(path,true);
}
//输出路径已存在,会报错
FileOutputFormat.setOutputPath(job,path);
//等待任务完成
job.waitForCompletion(true);
}
}
MapJoin:
大表关联小表
小表默认25M以下
大表无上限
原理:将小表进行广播,广播到每一个处理大表的Map任务当中
package com.shujia.MapReduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Hashtable;
public class Demo04MapJoin {
//Map端
public static class MyMapper extends Mapper {
//初始化在使用
Hashtable stuKV=new Hashtable<>();
//每个MapTask启动的时候会执行一次
@Override
protected void setup(Mapper.Context context) throws IOException, InterruptedException {
//获取小表的数据 并缓存到MapTask的内存当中
//通过context可以获取广播的小表的路径
URI[] cacheFiles = context.getCacheFiles();
//获取小表路径
String path = cacheFiles[0].toString();
//使用原生的HDFS JAVA API 加载小表的数据
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fsDataInputStream = fs.open(new Path(path));
BufferedReader br = new BufferedReader(new InputStreamReader(fsDataInputStream));
String line;
//为了方便做关联 需要选择合适的数据结构
//HashTable
while((line=br.readLine())!=null){
String id =line.split(",")[0];
//以id作为key,line作为value 存入HashTable
stuKV.put(id,line);
}
}
@Override
//主要处理大表的数据
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split(",");
String id = splits[0];
String subjectId=splits[1];
String subjectScore=splits[2];
//通过id去HashTable中获取学生信息数据,以此实现关联
String stuInfo = stuKV.getOrDefault(id,"");
//避免未关联上导致索引越界
if(!"".equals(stuInfo)){
String[] stuSplits=stuInfo.split(",");
if(stuSplits.length>=5){
String name= stuInfo.split(",")[1];
String clazz=stuInfo.split(",")[4];
context.write(new Text(id),new Text(name+','+clazz+","+subjectId+","+subjectScore));
}
}
}
}
//Driver端
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://master:9000");
//指定分隔符为逗号
conf.set("mapred.textoutputformat.separator",",");
//创建一个MapReduce的job
Job job = Job.getInstance(conf);
//配置任务
job.setJobName("Demo04MapJoin");
//设置任务运行哪个类
job.setJarByClass(Demo03Join.class);
//配置map端
//指定map运行时哪一个类
job.setMapperClass(Demo04MapJoin.MyMapper.class);
//配置Map端输出的key类型
job.setMapOutputKeyClass(Text.class);
//配置Map端输出的value类型
job.setMapOutputValueClass(Text.class);
//配置输入输出路径
FileInputFormat.addInputPath(job,new Path("/data/score/input"));
//把文件看成一张表,广播小表
job.addCacheFile(new URI("hdfs://master:9000/data/stu/input/students.txt"));
Path path = new Path("/data/mapJoin/output");
FileSystem fs = FileSystem.get(conf);
//判断输出路径是否存在,存在则删除
if (fs.exists(path)){
fs.delete(path,true);
}
//输出路径已存在,会报错
FileOutputFormat.setOutputPath(job,path);
//等待任务完成
job.waitForCompletion(true);
}
}



