HDFS
HDFS写(上传)数据流程HDFS读(下载)数据流程NameNode和Secondary NameNode工作机制DataNode工作机制 MapReduce
序列化输入切片与MapTask并行度
FileInputFormat切片机制TextInputFormatCombineTextInputFormat切片机制 MapReduce工作流程Partition分区自定义排序自定义Combiner输出OutputFormat实现Join
在Reduce阶段Join在Map阶段Join Yarn
HDFS HDFS写(上传)数据流程注意点:
- 每一个block要写入多个DataNode,客户端写入第一个节点,由该节点将数据传给其他副本存储节点。以Packet为单位传输,64K,后面有一个4字节的校验位。副本存储节点的选择
a. 客户端本地节点
b. 与副本一不同机架的随机节点
c. 与副本二同一机架的随机节点
- 先读完第一个block,再读下一个。
- NameNode中的元数据信息除了加载内存中,还会存放在磁盘的FsImage文件中。有更新操作,内存中的元数据信息直接修改,并追加写入Edits。Secondary NameNode用于定期合并FsImage和Edits。通常情况下,Secondary NameNode每隔一小时执行一次。一分钟检查一次操作次数,当操作次数达到1百万时,Secondary NameNode执行一次。
- DataNode每6小时向NameNode上报所有的块信息。心跳3秒一次。10分钟+30秒没有收到心跳,NameNode判断DataNode挂掉。DataNode读取数据时会使用校验算法crc(32),md5(128),sha1(160),且周期性检查,保证数据完整性。
实现序列化接口Writable
- 无参构造函数提供参数的getter和setter方法实现序列化方法write()和反序列化方法readFields(),注意顺序一致重写toString()方法
MapTask的个数 = 输入数据的切片个数。
抽象类InputFormat,子类抽象类FileInputFormat,子类有TextInputFormat,CombineFileInputFormat
- 分别对每个文件进行切片,切片大小计算规则为Math.max(minSize, Math.min(maxSize, blockSize)),默认minSize = 1,maxSize = Long.MAX_VALUE小文件也会算一个切片
TextInputFormat是FileInputFormat的实现类,按行读取每条记录。键是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何行终止符(换行符和回车符),Text类型。
CombineTextInputFormat切片机制CombineTextInputFormat是CombineFileInputFormat的实现类。
在驱动类中添加代码如下:
// 如果不设置InputFormat,它默认用的是TextInputFormat.class job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置4m CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较。
- 如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
- MapTask的结果写入环形缓冲区(默认100M),写入meta元信息(索引、分区)和数据(键值),达到80%开始溢写(spill)到磁盘文件。溢写前,使用快速排序算法对同一分区的数据进行排序。对溢写的临时文件使用归并排序算法对同一分区的数据进行合并。如果有combiner可以进行局部combine。每一个MapTask最终得到一个临时文件。ReduceTask的个数 = 分区个数。ReduceTask会读取不同的MapTask临时文件的同一分区的数据,使用归并排序算法排序后,对相同key的一组数据进行一次reduce操作。
默认分区是按照key的哈希值进行分区。
可以继承Partitioner类实现自定义分区。在驱动类中使用job.setNumReduceTasks(5);设置ReduceTask的个数。
- 如果ReduceTask的个数大于分区数量,输出结果会出现空文件;如果ReduceTask的个数小于分区数量,会报错;如果ReduceTask的个数为1,不会调用自定义分区方法。
实现WritableComparable接口
public interface WritableComparableextends Writable, Comparable
该接口除了继承序列化接口Writable,还继承了Comparable接口,需要重写compareTo()方法。
自定义Combiner自定义Combiner继承Reducer类,重写reduce()方法。
Combiner在每一个MapTask所在节点运行,而Reducer对所有Mapper的结果进行操作。
抽象类OutputFormat,抽象类子类FileOutputFormat,子类TextOutputFormat。
可以继承FileOutputFormat实现自定义输出,需要继承RecordWriter类。
定义一个Bean类,成员变量为两个表的字段,加上一个flag,标志是来自哪张表。
在Map阶段Join如果一张表数据大小很小,使用DistributedCache。
具体使用方法如下:
在驱动类加载缓存。
// 加载缓存数据
job.addCacheFile(new URI("file:///D:/input/tablecache/pd.txt"));
// Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
job.setNumReduceTasks(0);
在Mapper类的setup()方法中读取缓存文件。
protected void setup(Context context) throws IOException, InterruptedException {
//通过缓存文件得到小表数据pd.txt
URI[] cacheFiles = context.getCacheFiles();
Path path = new Path(cacheFiles[0]);
//获取文件系统对象,并开流
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream fis = fs.open(path);
//通过包装流转换为reader,方便按行读取
BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
//逐行读取,按行处理
String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
//切割一行
//01 小米
String[] split = line.split("t");
pdMap.put(split[0], split[1]);
}
//关流
IOUtils.closeStream(reader);
}
Yarn


