MapReduce的逻辑数据流
第一步、需要一个 map函数:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; //hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不直接使用java内嵌的类型 import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapreduce.Mapper; public class MaxTemperatureMapper extends Mapper{ //Mapper类是一个泛型类型,分别指定map函数的输入键,输入值,输出键,输出值的类型, private static final int MISSING = 9999; @Override public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+'){//返回指定索引处的字符 airTemperature = Integer.parseInt(line.substring(88,92)); }else{ airTemperature = Integer.parseInt(line.substring(87,92)); } String quality = line.substring(92,93); if (airTemperature != MISSING && quality.matches("[01459]")){ context.write(new Text(year),new IntWritable(airTemperature)); } } }
第二步、需要一个reduce函数:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MaxTemperatureReducer extends Reducer{ //reduce函数同样也有四个形式参数类型用于指定输入和输出类型 //reduce函数的输入类型必须匹配map函数的输出类型:即text和IntWritable //输出同样 @Override public void reduce(Text key,Iterable values,Context context) throws IOException,InterruptedException{ int maxValue = Integer.MIN_VALUE; for(IntWritable value:values){ maxValue = Math.max(maxValue,value.get()); } context.write(key,new IntWritable(maxValue)); } }
第三步、用来作业的代码:
import javafx.scene.text.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.Configuration;
import java.io.FileOutputStream;
import java.io.IOException;
public class MaxTemperature {
public static void main(String[] args) throws IOException {
if (args.length !=2){
System.out.println("Usage:MaxTemperature ");
System.exit(-1);
}
Configuration config = new Configuration();
Job job = Job.getInstance (config);
// Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max Temperature");
//路径可以是单个文件,也可以是一个目录或符合特定文件模式的一系列文件
//可多次调用实现多路径输入
FileInputFormat.addInputPath(job,new Path(args[0]));
//输出前目录应该是不存在的
FileOutputFormat.setOutputPath(job,new Path(args[1]);
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
try {
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}



