MapReduce程序的主体思想是分而治之。构建抽象模型:Map和Reduce
MapReduce中定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:
map: (k1; v1) → [(k2; v2)] 分,可以高度并行
reduce: (k2; [v2]) → [(k3; v3)] 合,将同一个分区的数据拉到一起处理
MapReduce处理的数据类型是
编写mr程序的代码片段如下
public class PartitionerMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
String jobName = PartitionerMain.class.getSimpleName();
Job job = Job.getInstance(super.getConf(),jobName);
job.setJarByClass(PartitionerMain.class);
//第一步:读取 文件,解析成key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path(args[0]));
//第二步:设置mapper类
job.setMapperClass(PartitionerMapper.class);
job.setMapOutputKeyClass(PairSort.class);//PairSort通过compareTo可以自定义排序,分组时默认也是通过compareTo来确定是否相等,toString可以自定义输出字符串形式
job.setMapOutputValueClass(Text.class);
//第三步:设置分区类,以及reducetask的个数,注意reduceTask的个数要与分区的个数一致
job.setPartitionerClass(PartitionerOwn.class);
job.setNumReduceTasks(2);
//第五步:设置combiner类
job.setCombinerClass(MyCombiner.class);
//设置自定义分组器
//覆写无参构造,将自定义的JavaBean(如PairSort)注册到自定义的分组器中
//通过重写compare方法可以重写分组逻辑
job.setGroupingComparatorClass(GroupingComparator.class);
//第三至六步都可省略,用框架的默认值
//第七步:设置reducer类
job.setReducerClass(PartitionerReducer.class);
job.setOutputKeyClass(PairSort.class);
job.setOutputValueClass(NullWritable.class);
//第八步:设置输出类及输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path(args[1]));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new PartitionerMain(), args);
System.exit(run);
}
}
粗略来说,mapTask的数量是与输入文件在hdfs上的block数相同。
更准确的来说,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,bytesRemaining < splitSize * 1.1 时为一块,然后有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。
在重写的map函数执行完之后,每条数据写入内存前会默认使用HashPartitioner进行分区,内存中的环形缓存区写满后就会溢写到磁盘(spill)。溢写时会进行排序和规约。数据处理结束后合并溢写的临时文件,最终一个mapTask只会生成一个磁盘文件,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
reduceTask的数据与分区数一致。可通过job.setNumReduceTasks(2)来设置。
每个reduceTask会根据索引文件拉取属于自己分区的数据。当copy到内存的数据达到一定阈值时,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,合并溢写文件,一个reduceTask最终只生成一个磁盘文件。数据合并时会进行排序。
nameNode管理元数据
nameNode在内存中会保存全部的完整的元数据信息,如何保证内存中的元数据0丢失?
如果是正常命令停止nameNode,会直接把元数据信息刷到磁盘里。如果是非正常关机是如何保证不丢失元数据信息的呢?
fsimage : 镜像文件,存的就是元数据信息,非常容易恢复,更新不及时
edits : 操作日志文件,即时存储除了查询之外的所有操作日志,恢复成元数据时效率低
secondarynameNode会定期拉取fsimage和edits合并成新的fsimage文件,拉取后nameNode会写新的edits文件,这样edits不会越来越大,且合并完成后的fsimage是非常接近内存中的真实元数据信息的。当重启时,会在根据fsimage和edits恢复出元数据存到内存里。



