栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

源码解析Spark各个ShuffleWriter的实现机制(二)——BypassMergeSortShuffleWriter

源码解析Spark各个ShuffleWriter的实现机制(二)——BypassMergeSortShuffleWriter

基于3.2版本分支。

BypassMergeSortShuffleWriter 简化流程图

示例

也就是说每个ShuffleMapTask都会对应着一个FileSegment,每个FileSegment可视作一个临时文件,接着这些FileSegment中对应的文件又会合并到一份DataFile中,通过IndexFile记录每个分区在DataFile中的起始偏移量。


这种Shuffle写文件方式避免了大量小文件给文件系统造成压力的情况,先是每个ShuffleMapTask对应一个文件的方式,再将这些文件合并到一份数据文件中,并索引文件记录了每个分区在数据文件中的偏移量,能够做到随机访问指定RDD分区的数据。

有兴趣可详读BypassMergeSortShuffleWriter#write的实现,这里我给出关键注释:

// BypassMergeSortShuffleWriter.java
public void write(Iterator> records) throws IOException {
  assert (partitionWriters == null);
  ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
      .createMapOutputWriter(shuffleId, mapId, numPartitions);
  try {
    // 没有需要写的记录,直接提交,结束
    if (!records.hasNext()) {
      partitionLengths = mapOutputWriter.commitAllPartitions(
        ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
      mapStatus = MapStatus$.MODULE$.apply(
        blockManager.shuffleServerId(), partitionLengths, mapId);
      return;
    }
    final SerializerInstance serInstance = serializer.newInstance();
    final long openStartTime = System.nanoTime();
    partitionWriters = new DiskBlockObjectWriter[numPartitions];
    partitionWriterSegments = new FileSegment[numPartitions];
    for (int i = 0; i < numPartitions; i++) {
      // 为每个RDD分区创建临时的ShuffleBlock,它包含blockId和一个文件句柄,文件名即blockId
      final Tuple2 tempShuffleBlockIdPlusFile =
          blockManager.diskBlockManager().createTempShuffleBlock();
      final File file = tempShuffleBlockIdPlusFile._2();
      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
      // 为每个RDD分区都创建一个writer
      // getDiskWriter即新建writer
      DiskBlockObjectWriter writer =
        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
      if (partitionChecksums.length > 0) {
        writer.setChecksum(partitionChecksums[i]);
      }
      partitionWriters[i] = writer;
    }
    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

    while (records.hasNext()) {
      // 将每条记录计算出所属的RDD分区,并调用对应分区的writer
      final Product2 record = records.next();
      final K key = record._1();
      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
    }

    for (int i = 0; i < numPartitions; i++) {
      try (DiskBlockObjectWriter writer = partitionWriters[i]) {
        // 提交并获取对应的FileSegment
        // 提交实际上就是强制flush,并记录将当前写入前位置和写入长度记录到FileSegment中
        partitionWriterSegments[i] = writer.commitAndGet();
      }
    }
    // 这里将所有上边FileSegment中的数据文件都合并到一份文件中
    // 为合并后的文件生成一个索引文件,该文件记录了每个分区在合并后文件的起始偏移量
    partitionLengths = writePartitionedData(mapOutputWriter);
    mapStatus = MapStatus$.MODULE$.apply(
      blockManager.shuffleServerId(), partitionLengths, mapId);
  } catch (Exception e) {
    // ...
  }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/735828.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号