数据内容
chinese.txt
1 lisi 89 2 lisi 73 3 lisi 67 1 zhangyang 49 2 zhangyang 83 3 zhangyang 27 1 lixiao 77 2 lixiao 66 3 lixiao 89
english.txt
1 lisi 75 2 lisi 94 3 lisi 100 1 zhangyang 61 2 zhangyang 59 3 zhangyang 98 1 lixiao 25 2 lixiao 47 3 lixiao 48
math.txt
1 lisi 75 2 lisi 94 3 lisi 100 1 zhangyang 61 2 zhangyang 59 3 zhangyang 98 1 lixiao 25 2 lixiao 47 3 lixiao 481. Score
package hadoop_test.mutil_files_09.domain;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class Score implements Writable{
private String name;
private int chinese;
private int english;
private int math;
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(chinese);
out.writeInt(english);
out.writeInt(math);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name=in.readUTF();
this.chinese=in.readInt();
this.english=in.readInt();
this.math=in.readInt();
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getChinese() {
return chinese;
}
public void setChinese(int chinese) {
this.chinese = chinese;
}
public int getEnglish() {
return english;
}
public void setEnglish(int english) {
this.english = english;
}
public int getMath() {
return math;
}
public void setMath(int math) {
this.math = math;
}
@Override
public String toString() {
return "Score [name=" + name + ", chinese=" + chinese + ", english=" + english + ", math=" + math + "]";
}
}
2. ScoreDriver
package hadoop_test.mutil_files_09.score;
import hadoop_test.Utils_hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import hadoop_test.mutil_files_09.domain.Score;
import org.apache.log4j.BasicConfigurator;
public class ScoreDriver {
public static void main(String[] args) throws Exception {
System.setProperty("HADOOP_USER_NAME", "root");
BasicConfigurator.configure();
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(ScoreDriver.class);
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Score.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Score.class);
FileInputFormat.setInputPaths(job,new Path("/hadoop_test/m_file_test"));
FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/m_file_test/result"));
job.waitForCompletion(true);
}
}
3. ScoreMapper
package hadoop_test.mutil_files_09.score; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import hadoop_test.mutil_files_09.domain.Score; //split public class ScoreMapper extends Mapper3. ScoreReducer{ //一行一行处理 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //split切片类, FileSplit文件分割器 FileSplit split = (FileSplit) context.getInputSplit(); //split.getPath().getName();获得正在读取的这个split的文件名 String filename = split.getPath().getName(); // String filepath=split.getPath().toString(); // System.out.println(filepath); //line===》 1 lisi 3 String line = value.toString(); //理解为join 的 key String name = line.split(" ")[1]; //分数 int score = Integer.parseInt(line.split(" ")[2]); Score s = new Score(); s.setName(name); // System.out.println(filename); if(filename.equals("chinese.txt")){ s.setChinese(score); } if(filename.equals("english.txt")){ s.setEnglish(score); } if(filename.equals("math.txt")){ s.setMath(score); } context.write(new Text(s.getName()), s); } }
package hadoop_test.mutil_files_09.score; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import hadoop_test.mutil_files_09.domain.Score; public class ScoreReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //new 一个tmp方法 Score tmp=new Score(); tmp.setName(key.toString()); for(Score value:values){ tmp.setChinese(tmp.getChinese()+value.getChinese()); tmp.setEnglish(tmp.getEnglish()+value.getEnglish()); tmp.setMath(tmp.getMath()+value.getMath()); } // System.out.println(key+":"+tmp); context.write(key, tmp); } }
数据结果
Hadoop学习:MapReduce实现两张表合并
Hadoop-MapReduce(多表合并)
Hadoop案例:Mapper端多表合并



