栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021-12-18 hadoop3 写数据流程(一):创建输出流

2021-12-18 hadoop3 写数据流程(一):创建输出流

源码根据hadoop-3.3.0,欢迎指正(主要关注重点流程,过程中很多未注释)。

1 概述

  1. 发送创建文件请求:调用分布式文件系统 DistributedFileSystem.create( )方法;
  2. NameNode 创建文件记录:分布式文件系统 DistributedFileSystem 发送 RPC 请求给 NameNode,NameNode 检查权限(读写权限以及是否已经存在)后创建一条记录,返回输出流 FSDataOutputStream,封装了输出流 DFSOutputDtream;
  1. 客户端写入数据:输出流 DFSOutputDtream 将数据分成一个个的数据包(64Kb的packet由多个chunk组成,chunk由512b的chunk与4b的chunksum组成),并写入内部队列。DataStreamer 根据 DataNode 列表来要求 NameNode 分配适合的新块来存储数据备份。 一组 DataNode 构成管线(管线的 DataNode 之间使用 Socket 流式通信);
  2. 使用管线传输数据:DataStreamer 将数据包流式传输到管线第一个DataNode,第一个 DataNode 再传到第二个DataNode,直到完成;
  1. 确认队列:DataNode 收到数据后发送确认,管线的 DataNode 所有的确认组成一个确认队列(ack队列)。所有 DataNode 都确认,管线数据包删除;
  2. 关闭:客户端对数据量调用 close( ) 方法。将剩余所有数据写入DataNode管线,联系NameNode并且发送文件写入完成信息之前等待确认;
  1. NameNode确认:
  2. 故障处理:若过程中发生故障,则先关闭管线,把队列中所有数据包添加回去队列,确保数据包不漏。为另一个正常 DataNode 的当前数据块指定一个新的标识,并将该标识传送给 NameNode,以便故障 DataNode 在恢复后删除上面的不完整数据块。从管线中删除故障 DataNode 并把余下的数据块写入余下正常的 DataNode。NameNode 发现复本量不足时,会在另一个节点创建一个新的复本;

2 创建INodeFile

本系列以向hdfs集群上传一个文件为例,我们通过前面hadoop3目录树一文可以知道,文件在集群中对应的对象即为INodeFile,因此我们这里先了解一下创建一个INodeFile的流程。

2.1 DistributedFileSystem#create

create方法中父类的定义:

public abstract FSDataOutputStream create(Path f,
                                          FsPermission permission,
                                          boolean overwrite,
                                          int bufferSize,
                                          short replication,
                                          long blockSize,
                                          Progressable progress) throws IOException;

DistributedFileSystem的实现如下,主要是在调用DFSClient中的create方法创建FSDataOutputStream之前,会做一些静态数据的添加:

@Override
public FSDataOutputStream create(Path f, FsPermission permission,
                                 boolean overwrite, int bufferSize, short replication, long blockSize,
                                 Progressable progress) throws IOException {
    return this.create(f, permission,
                       overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
                       : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
                       blockSize, progress, null);
}


@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
                                 final EnumSet cflags, final int bufferSize,
                                 final short replication, final long blockSize,
                                 final Progressable progress, final ChecksumOpt checksumOpt)
    throws IOException {
    // statistics:文件系统中已完成的读取、写入等操作的统计信息
    // 往静态数据中增加写操作次数
    statistics.incrementWriteOps(1);
    // storageStatistics:记录每个storage发出的dfs操作的次数
    // 增加一次create次数
    storageStatistics.incrementOpCounter(OpType.CREATE);
    // 获取上传文件的绝对路径
    Path absF = fixRelativePart(f);
    // 这里resolve方法尝试使用指定的 FileSystem 和 Path
    // 调用隐式类的doCall(Path)方法。如果调用失败并出现 UnresolvedlinkException,
    // 它将尝试解析路径并通过调用 next(FileSystem, Path) 重试调用。
    return new FileSystemlinkResolver() {
      @Override
      public FSDataOutputStream doCall(final Path p) throws IOException {
          // 这里调用DFSClient#create方法创建DFSOutputStream
        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
            cflags, replication, blockSize, progress, bufferSize,
            checksumOpt);
        return safelyCreateWrappedOutputStream(dfsos);
      }
      @Override
      public FSDataOutputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.create(p, permission, cflags, bufferSize,
            replication, blockSize, progress, checksumOpt);
      }
    }.resolve(this, absF);
}

2.2 DFSClient#create

在DistributeFileSystem#create方法中,会调用自身的重载方法,最后create方法如下:

public DFSOutputStream create(String src, FsPermission permission,
            EnumSet flag, boolean createParent, short replication,
            long blockSize, Progressable progress, int buffersize,
            ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
            String ecPolicyName, String storagePolicy)
    throws IOException {
    // 检测client是否与namenode相连
    checkOpen();
    // 创建对应的权限
    final FsPermission masked = applyUMask(permission);
    LOG.debug("{}: masked={}", src, masked);
    // 调用 DFSOutputStream.newStreamForCreate()创建输出流对象
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        dfsClientConf.createChecksum(checksumOpt),
        getFavoredNodesStr(favoredNodes), ecPolicyName, storagePolicy);
    beginFileLease(result.getFileId(), result);
    return result;
}

2.3 DFSOutputStream#newStreamForCreate

在上述方法中将会调用DFSOutputStream.newStreamForCreate方法创建对应对应的DFSOutputStream(根据配置的Erasure coding policy),在此之前会通过rpc远程调用NameNodeRpcServer#create方法创建一个HdfsFileStatus对象,

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
                                          FsPermission masked, EnumSet flag, boolean createParent,
                                          short replication, long blockSize, Progressable progress,
                                          DataChecksum checksum, String[] favoredNodes, String ecPolicyName,
                                          String storagePolicy)
    throws IOException {
    try (TraceScope ignored =
         dfsClient.newPathTraceScope("newStreamForCreate", src)) {
        HdfsFileStatus stat = null;

        // Retry the create if we get a RetryStartFileException up to a maximum
        // number of times
        boolean shouldRetry = true;
        int retryCount = CREATE_RETRY_COUNT;
        while (shouldRetry) {
            shouldRetry = false;
            try {
                // rpc远程调用NameNodeRpcServer#create创建HdfsFileStatus对象,
                // 该对象标识在集群中的文件实体的元数据
                stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
                                                 new EnumSetWritable<>(flag), createParent, replication,
                                                 blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName,
                                                 storagePolicy);
                break;
            } catch (RemoteException re) {
                IOException e = re.unwrapRemoteException(
                    AccessControlException.class,
                    DSQuotaExceededException.class,
                    QuotaByStorageTypeExceededException.class,
                    FileAlreadyExistsException.class,
                    FileNotFoundException.class,
                    ParentNotDirectoryException.class,
                    NSQuotaExceededException.class,
                    RetryStartFileException.class,
                    SafeModeException.class,
                    UnresolvedPathException.class,
                    SnapshotAccessControlException.class,
                    UnknownCryptoProtocolVersionException.class);
                if (e instanceof RetryStartFileException) {
                    if (retryCount > 0) {
                        shouldRetry = true;
                        retryCount--;
                    } else {
                        throw new IOException("Too many retries because of encryption" +
                                              " zone operations", e);
                    }
                } else {
                    throw e;
                }
            }
        }
        Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
        final DFSOutputStream out;
        // 判断是否配置了ErasureCodingPolicy从而创建不同的DFSOutputStream对象
        // 过程中会使用通过rpc远程创建INodeFile返回的HDFSFileStatus对象
        if(stat.getErasureCodingPolicy() != null) {
            out = new DFSStripedOutputStream(dfsClient, src, stat,
                                             flag, progress, checksum, favoredNodes);
        } else {
            out = new DFSOutputStream(dfsClient, src, stat,
                                      flag, progress, checksum, favoredNodes, true);
        }
        // 启动往dn pipeline发送packet数据的的DataStreamer
        out.start();
        return out;
    }
}

2.4 创建HdfsFileStatus

在上一步之中通过rpc远程调用NameNodeRpcServer中的create方法,创建一个HdfsFileStatus,封装了文件系统中实体的 HDFS 元数据

@Override // ClientProtocol
public HdfsFileStatus create(String src, FsPermission masked,
                             String clientName, EnumSetWritable flag,
                             boolean createParent, short replication, long blockSize,
                             CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
                             String storagePolicy)
    throws IOException {
    // 检查NN是否启动
    checkNNStartup();
    String clientMachine = getClientMachine();
    if (stateChangeLog.isDebugEnabled()) {
        stateChangeLog.debug("*DIR* NameNode.create: file "
                             +src+" for "+clientName+" at "+clientMachine);
    }
    if (!checkPathLength(src)) {
        throw new IOException("create: Pathname too long.  Limit "
                              + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    // 检查是否具有操作权限,只有在active nn中才拥有操作权限
    namesystem.checkOperation(OperationCategory.WRITE);
    CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
    if (cacheEntry != null && cacheEntry.isSuccess()) {
        return (HdfsFileStatus) cacheEntry.getPayload();
    }

    HdfsFileStatus status = null;
    try {
        // 创建对应的权限对象
        PermissionStatus perm = new PermissionStatus(getRemoteUser()
                                                     .getShortUserName(), null, masked);
        // 通过FSNamesystem在名称空间中创建一个文件,并返回一个HDFSFileStatus
        status = namesystem.startFile(src, perm, clientName, clientMachine,
                                      flag.get(), createParent, replication, blockSize, supportedVersions,
                                      ecPolicyName, storagePolicy, cacheEntry != null);
    } finally {
        RetryCache.setState(cacheEntry, status != null, status);
    }

    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return status;
}

2.5 FSNamesystem#startFile

这里主要还是在方法内调用它的重载方法(实在忍不住吐槽,hadoop的代码质量真的不怎么样,这个方法实在太长且方法参数太多了。。。。。):

private HdfsFileStatus startFileInt(String src,
                    PermissionStatus permissions, String holder, String clientMachine,
                    EnumSet flag, boolean createParent, short replication,
                    long blockSize, CryptoProtocolVersion[] supportedVersions,
                    String ecPolicyName, String storagePolicy, boolean logRetryCache)
    throws IOException {
    // 添加changeLog日志,为了代码断点,省略一点吧(移除了)
    
    // 检查路径是否合规
    if (!DFSUtil.isValidName(src) ||
        FSDirectory.isExactReservedName(src) ||
        (FSDirectory.isReservedName(src)
         && !FSDirectory.isReservedRawName(src)
         && !FSDirectory.isReservedInodesName(src))) {
        throw new InvalidPathException(src);
    }

    boolean shouldReplicate = flag.contains(CreateFlag.SHOULD_REPLICATE);
    if (shouldReplicate &&
        (!org.apache.commons.lang3.StringUtils.isEmpty(ecPolicyName))) {
        throw new HadoopIllegalArgumentException("SHOULD_REPLICATE flag and " +
                                                 "ecPolicyName are exclusive parameters. Set both is not allowed!");
    }

    INodesInPath iip = null;
    boolean skipSync = true; // until we do something that might create edits
    HdfsFileStatus stat = null;
    BlocksMapUpdateInfo toRemoveBlocks = null;

    // 检查是否拥有写的权限
    checkOperation(OperationCategory.WRITE);
    final FSPermissionChecker pc = getPermissionChecker();
    FSPermissionChecker.setOperationType(null);
    writeLock();
    try {
        checkOperation(OperationCategory.WRITE);
        // 检查是否处于安全模式,安全模式不具备写权限
        checkNameNodeSafeMode("Cannot create file" + src);

        // 解析生成一个INodesInPath,即INode路径信息,在其中会进行权限的检查
        iip = FSDirWriteFileOp.resolvePathForStartFile(
            dir, pc, src, flag, createParent);


        if (blockSize < minBlockSize) {
            throw new IOException("Specified block size is less than configured" +
                                  " minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
                                  + "): " + blockSize + " < " + minBlockSize);
        }

        // 检查副本数是否符合条件
        if (shouldReplicate) {
            blockManager.verifyReplication(src, replication, clientMachine);
        } else {
            final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp
                .getErasureCodingPolicy(this, ecPolicyName, iip);
            if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
                checkErasureCodingSupported("createWithEC");
                if (blockSize < ecPolicy.getCellSize()) {
                    throw new IOException("Specified block size (" + blockSize
                                          + ") is less than the cell size (" + ecPolicy.getCellSize()
                                          +") of the erasure coding policy (" + ecPolicy + ").");
                }
            } else {
                blockManager.verifyReplication(src, replication, clientMachine);
            }
        }

        // 文件加密信息处理
        FileEncryptionInfo feInfo = null;
        if (!iip.isRaw() && provider != null) {
            EncryptionKeyInfo ezInfo = FSDirEncryptionZoneOp.getEncryptionKeyInfo(
                this, iip, supportedVersions);
            // if the path has an encryption zone, the lock was released while
            // generating the EDEK.  re-resolve the path to ensure the namesystem
            // and/or EZ has not mutated
            if (ezInfo != null) {
                checkOperation(OperationCategory.WRITE);
                iip = FSDirWriteFileOp.resolvePathForStartFile(
                    dir, pc, iip.getPath(), flag, createParent);
                feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
                    dir, iip, ezInfo);
            }
        }

        skipSync = false; // following might generate edits
        toRemoveBlocks = new BlocksMapUpdateInfo();
        dir.writeLock();
        try {
            // 创建INodeFile
            stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
                                              clientMachine, flag, createParent, replication, blockSize, feInfo,
                                              toRemoveBlocks, shouldReplicate, ecPolicyName, storagePolicy,
                                              logRetryCache);
        } catch (IOException e) {
            skipSync = e instanceof StandbyException;
            throw e;
        } finally {
            dir.writeUnlock();
        }
    } finally {
        writeUnlock("create");
        // There might be transactions logged while trying to recover the lease.
        // They need to be sync'ed even when an exception was thrown.
        if (!skipSync) {
            getEditLog().logSync();
            if (toRemoveBlocks != null) {
                removeBlocks(toRemoveBlocks);
                toRemoveBlocks.clear();
            }
        }
    }

    return stat;
}

2.6 创建文件

2.6.1 FSDirWriteFileOp.startFile

这个方法是用来创建一个不存在的文件或者覆写一个已存在的文件

static HdfsFileStatus startFile(
    FSNamesystem fsn, INodesInPath iip,
    PermissionStatus permissions, String holder, String clientMachine,
    EnumSet flag, boolean createParent,
    short replication, long blockSize,
    FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
    boolean shouldReplicate, String ecPolicyName, String storagePolicy,
    boolean logRetryEntry)
    throws IOException {
    assert fsn.hasWriteLock();
    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
    boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);

    final String src = iip.getPath();
    FSDirectory fsd = fsn.getFSDirectory();

    if (iip.getLastINode() != null) {
        // 如果支持覆写,则删除原有的老文件
        if (overwrite) {
            List toRemoveINodes = new ChunkedArrayList<>();
            List toRemoveUCFiles = new ChunkedArrayList<>();
            long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
                                            toRemoveINodes, toRemoveUCFiles, now());
            if (ret >= 0) {
                iip = INodesInPath.replace(iip, iip.length() - 1, null);
                FSDirDeleteOp.incrDeletedFileCount(ret);
                fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
            }
        } else {
            // If lease soft limit time is expired, recover the lease
            fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
                                     src, holder, clientMachine, false);
            throw new FileAlreadyExistsException(src + " for client " +
                                                 clientMachine + " already exists");
        }
    }
    fsn.checkFsObjectLimit();
    INodeFile newNode = null;
    // 创建父类目录
    INodesInPath parent =
        FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
    if (parent != null) {
        // 将给定文件名的文件添加到文件系统中
        iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
                      replication, blockSize, holder, clientMachine, shouldReplicate,
                      ecPolicyName, storagePolicy);
        newNode = iip != null ? iip.getLastINode().asFile() : null;
    }
    if (newNode == null) {
        throw new IOException("Unable to add " + src +  " to namespace");
    }
    fsn.leaseManager.addLease(
        newNode.getFileUnderConstructionFeature().getClientName(),
        newNode.getId());
    if (feInfo != null) {
        FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo,
                                                    XAttrSetFlag.CREATE);
    }
    setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
    fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
    if (NameNode.stateChangeLog.isDebugEnabled()) {
        NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
                                      src + " inode " + newNode.getId() + " " + holder);
    }
    return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
}

2.6.2 FSDirWriteFileOp#addFile

此方法会调用addNode方法将过程中新创建的INodeFile对象添加到FSDirectory中,即namespace对应的内存中的对象之中:

private static INodesInPath addFile(
    FSDirectory fsd, INodesInPath existing, byte[] localName,
    PermissionStatus permissions, short replication, long preferredBlockSize,
    String clientName, String clientMachine, boolean shouldReplicate,
    String ecPolicyName, String storagePolicy) throws IOException {

    Preconditions.checkNotNull(existing);
    long modTime = now();
    INodesInPath newiip;
    fsd.writeLock();
    try {
        boolean isStriped = false;
        ErasureCodingPolicy ecPolicy = null;
        byte storagepolicyid = 0;
        if (storagePolicy != null && !storagePolicy.isEmpty()) {
            BlockStoragePolicy policy =
                fsd.getBlockManager().getStoragePolicy(storagePolicy);
            if (policy == null) {
                throw new HadoopIllegalArgumentException(
                    "Cannot find a block policy with the name " + storagePolicy);
            }
            storagepolicyid = policy.getId();
        }
        if (!shouldReplicate) {
            ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
                fsd.getFSNamesystem(), ecPolicyName, existing);
            if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
                isStriped = true;
            }
        }
        final BlockType blockType = isStriped ?
            BlockType.STRIPED : BlockType.CONTIGUOUS;
        final Short replicationFactor = (!isStriped ? replication : null);
        final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
        // 根据对应的权限等信息创建一个新的INodeFile对象
        INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
                                         modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
                                         storagepolicyid, blockType);
        newNode.setLocalName(localName);
        newNode.toUnderConstruction(clientName, clientMachine);
        // 往namespace(FSDirectory)对象中添加指定的INode(可以是INodeFile也可以是INodeDirectory)
        newiip = fsd.addINode(existing, newNode, permissions.getPermission());
    } finally {
        fsd.writeUnlock();
    }
    if (newiip == null) {
        NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
                                     existing.getPath() + "/" + DFSUtil.bytes2String(localName));
        return null;
    }

    if(NameNode.stateChangeLog.isDebugEnabled()) {
        NameNode.stateChangeLog.debug("DIR* addFile: " +
                                      DFSUtil.bytes2String(localName) + " is added");
    }
    return newiip;
}

2.7 总结

这里对调用DistributedFileSystem#create方法最后返回一个DFSOutputStream做一个总结

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/673469.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号