一,以hdfs路径/tmp/table/student_score.txt为输入,表结构为(学号,姓名,课程名称,成绩),字段间分隔符为tab,如下图所示。通过设置reduce个数为2,自定义hash partition实现将其中姓名为"张一"的放到同一个reduce中,非张一的放到其它的reduce中,输出结果字段为(学号,姓名,课程名称,成绩),按tab分隔即可。
具体实现代码(java部分):
package com.hadoop.mr.homework2;
import com.hadoop.ReadFromHdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class HomeWork2 {
// 设计kv
// key 就是学号
// value 除了学号之外的其它内容
public static class HomeWork2Mapper extends Mapper
二,白名单问题
这里有个注意点:
//特殊的正则表达式 \r?\n 为的是能让不同的系统都能识别分隔
package com.hadoop.mr.homework2;
import com.hadoop.ReadFromHdfs;
import com.hadoop.mr.homework.HomeWork1;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class WhiteList {
public static class WhiteListMapper extends Mapper
三,流式分组问题
已排序好文本文件的分组-流式分组)给定一个本地文本文件finance_record_sorted.txt,共2个字段(工号,报销费用),其中按工号升序排列,并用tab分隔。求对该数据进行按工号字段的分组,并在控制台输出如下图所示的结果。
package com.hadoop.mr.homework2;
import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
public class CurrentGroup {
public static void main(String[] args) throws IOException {
//1,逐行读取数据
String path= "E:\daily_expense.txt";
FileInputStream fis =new FileInputStream(path);
BufferedReader br =new BufferedReader(new InputStreamReader(fis));
//2,将S作为key,后面的作为集合存储
String line=null;
String first=null;
List list =new ArrayList<>();
while((line = br.readLine()) != null){
String[] values =line.split("t");
//3,如果key一样,则vlaue存入集合组中
if(first==null){
first = values[0];
list.add(values[1]);
}else if(first.equals(values[0])){
list.add(values[1]);
}else{
//4,如果key不一样,则输出上一个key,并将上一个key替换为新的key
System.out.println(first+ " "+list);
list.clear();
first=values[0];
list.add(values[1]);
}
}
//5,输出最后一组key,val
System.out.println(first+ " "+list);
br.close();
fis.close();
}
}
结果展示: