多个路径进行判断,不同数据输出不同,整合学生和成绩
public class Demo03JoinStuSco {
public static class JoinMapper extends Mapper {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、获取数据的输入路径InputSplit
//context上面是hdfs
InputSplit inputSplit = context.getInputSplit();
FileSplit fs = (FileSplit) inputSplit;
//2、获取路径
String url = fs.getPath().toString();
//3、判断
if (url.contains("students")) { //true表示数据为students学生数据
String[] split = value.toString().split(",");
String id = split[0];
//4、打标记,针对不同数据,方便reduce端处理
String line = "*" + value.toString();
//学号作为key
context.write(new Text(id), new Text(line));
} else { //false 表示为学生成绩数据
String[] split = value.toString().split(",");
String id = split[0];
//5、打标记,针对不同数据,方便reduce端处理
String line = "#" + value.toString();
context.write(new Text(id), new Text(line));
}
}
}
public static class JoinReduce extends Reducer {
@Override
protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//保存数据
String stu = "";
//集合临时存储成绩
ArrayList list = new ArrayList();
for (Text value : values) {
//获取一行数据进行判断
String line = value.toString();
if (line.startsWith("*")) { //true 为学生数据
stu = line.substring(1);
} else { //是成绩数据
String substring = line.substring(1);
list.add(substring);
}
}
//数据写出,全写出
// for (String string : list) {
// String[] split = string.split(",");
// String proj = split[1];
// String sc = split[2];
// String end = stu + "," + proj + "," + sc;
// context.write(new Text(end), NullWritable.get());
// }
//数据写出,求总分
long sum = 0L;
for (String string : list) {
String[] split = string.split(",");
String score = split[2];
int i = Integer.parseInt(score);
sum += i;
}
String end = stu + "," + sum;
context.write(new Text(end), NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
// 1)获取配置信息,获取job对象实例
Job job = Job.getInstance();
job.setJobName("map-reduce2学生成绩连表Join");
// 2)指定本程序的jar包所在的本地路径
job.setJarByClass(Demo03JoinStuSco.class);
// 3)关联Mapper/Reduce业务类
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReduce.class);
// 4)指定Mapper输出数据的kv
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// 5)指定最终输出数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 6)指定job的输入原始文件所在目录
FileInputFormat.addInputPath(job, new Path("/data"));
// 7)指定job的 输出结果所在目录
FileOutputFormat.setOutputPath(job, new Path("/output"));
// 8)提交作业
boolean b = job.waitForCompletion(true);
System.out.println("map-reduce2学生成绩连表Join执行完成");
}
}



