栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC Hbase(七)项目代码分析- flush

2021SC@SDUSC Hbase(七)项目代码分析- flush

2021SC@SDUSC

        在前几篇文章中我们已经介绍了HRegion上MemStore的flush流程和HRegionServer上MenStore的flush流程。本文我们将探究Hbase中MemStore的flush的发起时机。

        前文中我们已知,flush的请求会通过requestFlush()或requestDelayedFlush()方法被添加到MemStoreFlusher的flushQueue队列中,然后由其内部的FlushHandler线程组消费,对需要flush的HRegion进行处理。所以调用MemStoreFlusher这两个方法的地方肯定就是MemStore发起flush的部分时机,不过也可能会存在部分操作和内部流程直接调用HRegion的flushcache()触发flush。下面,我们开始总结下会触发flush的操作或者内部流程,包括flush的判断条件等。

        一、通过将RequsetFlush请求添加到MemStoreFlusher的flushQueue队列

        1. 单个put

        HRegion中处理Put的put(Put put)方法再开始执行操作前会调用checkResources()检查HRegion的MemStore大小是否超过一定的阈值,超过了就会调用requestFlush()发起对该HRegion的MemStore的flush请求,并抛出RegionTooBusyException异常,阻止该操作继续。checkResources()方法代码如下:

  private void checkResources()
    throws RegionTooBusyException {
    // If catalog region, do not impose resource constraints or block updates.
    if (this.getRegionInfo().ismetaRegion()) return;
    if (this.memstoreSize.get() > this.blockingMemStoreSize) {
      blockedRequestsCount.increment();
      requestFlush();
      
      throw new RegionTooBusyException("Above memstore limit, " +
          "regionName=" + (this.getRegionInfo() == null ? "unknown" :
          this.getRegionInfo().getRegionNameAsString()) +
          ", server=" + (this.getRegionServerServices() == null ? "unknown" :
          this.getRegionServerServices().getServerName()) +
          ", memstoreSize=" + memstoreSize.get() +
          ", blockingMemStoreSize=" + blockingMemStoreSize);
    }
  }

        首先判断是否是meta Region,如果是则不实施资源约束或阻塞更新;

        如果Region当前内存大小超过阈值,则更新阻塞请求计数器,发起刷新MemStore请求,然后抛出RegionTooBusyException异常,阻止该操作。

        内存大小memstoreSize是当前时刻HRegion上MemStore的大小,他是在Put等操作中通过调用addAndGetGlobalMemstoreSize()方法实时更新的:

  
  public long addAndGetGlobalMemstoreSize(long memStoreSize) {
    if (this.rsAccounting != null) {
      rsAccounting.addAndGetGlobalMemstoreSize(memStoreSize);
    }
    return this.memstoreSize.addAndGet(memStoreSize);
  }

        blockingMemStoreSize是HRegion上设定的MemStore的一个阈值,当MemStore的大小超过这个阈值时,将会阻塞数据更新操作。其定义在HRegion上线背构造时需要调用的一个setHTableSpecificConf()中:

    this.blockingMemStoreSize = this.memstoreFlushSize *
        conf.getLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
                HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);

        由代码可见,它是由memstoreFlushSize乘以一个比例来计算的(没配置的话默认为4)

        那么memstoreFlushSize又是什么呢?它是HRegion上设定的一个阈值,当MemStore的大小超过它时,将会发起flush请求。它的计算首先是由Table决定的,即每个表可以设定自己的memstoreFlushSize:

    if (this.htableDescriptor == null) return;
    long flushSize = this.htableDescriptor.getMemStoreFlushSize();
 
    if (flushSize <= 0) {
      flushSize = conf.getLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE,
        HTableDescriptor.DEFAULT_MEMSTORE_FLUSH_SIZE);
    }
    this.memstoreFlushSize = flushSize;

        接下来我们来看requestFlush()是如何发送的:

private void requestFlush() {
    
	if (this.rsServices == null) {
      return;
    }

    synchronized (writestate) {
      if (this.writestate.isFlushRequested()) {
        return;
      }
      writestate.flushRequested = true;
    }

    this.rsServices.getFlushRequester().requestFlush(this);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Flush requested on " + this);
    }
  }

        代码逻辑如下:

  1. 如果HRegion的rsServices为空,直接返回
  2. 检查writestate的状态,如果为flushRequested,直接返回,避免重复请求,否则将writestate的flushRequested设置为true,并通过rsServices获得FlushRequester,调用其requestFlush()方法,将HRegion传入,发起flush请求。

        继续回到put()方法,依次调用doBatchMutate()、batchMutate()、batchMutate()方法,执行Put操作。batchMutate()是针对批量的Put、Delete等操作而专门设计的一个方法,只不过单个的Put等操作传入的是一个只包含一个操作的数组。该方法会在批量操作未全部完成前一直循环,每次循环时,都会调用checkResources()检测MemStore,并调用doMiniBatchMutation()方法完成操作并同步更新HRegion的MemStore大小,获取其值为newSize,最后通过isFlushSize()方法判断是否需要发起一个flush请求来决定是否调用requestFlush()方法:

**
   * Perform a batch of mutations.
   * It supports only Put and Delete mutations and will ignore other types passed.
   * @param batchOp contains the list of mutations
   * @return an array of OperationStatus which internally contains the
   *         OperationStatusCode and the exceptionMessage if any.
   * @throws IOException
   */
  OperationStatus[] batchMutate(BatchOperationInProgress batchOp) throws IOException {
    boolean initialized = false;
    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
    startRegionOperation(op);
    try {
    	
      while (!batchOp.isDone()) {
    	if (!batchOp.isInReplay()) {
          checkReadonly();
        }
    	
        checkResources();
 
        if (!initialized) {
          this.writeRequestsCount.add(batchOp.operations.length);
          if (!batchOp.isInReplay()) {
            doPreMutationHook(batchOp);
          }
          initialized = true;
        }
        long addedSize = doMiniBatchMutation(batchOp);

        long newSize = this.addAndGetGlobalMemstoreSize(addedSize);
        
        if (isFlushSize(newSize)) {
          requestFlush();
        }
      }
    } finally {
      closeRegionOperation(op);
    }
    
    return batchOp.retCodeDetails;
  }

        isFlushSize()方法是通过判断当前MemStore大小newSize是否超过memstoreFlushSize来决定的:

  private boolean isFlushSize(final long size) {
    return size > this.memstoreFlushSize;
  }

        至此,单个put操作结束。

        2、单个Delete操作

        与单个put大概一致:

//
  // set() methods for client use.
  //
  
  public void delete(Delete delete)
  throws IOException {
    checkReadonly();
    checkResources();
    startRegionOperation(Operation.DELETE);
    try {
      delete.getRow();
      // All edits for the given row (across all column families) must happen atomically.
      doBatchMutate(delete);
    } finally {
      closeRegionOperation(Operation.DELETE);
    }
  }

        也是先调用checkResources()检查MemStore,再调用doBatchMutate()进行处理。

        3、checkAndMutate/checkAndRowMutate操作

        4.单个append操作

        5.单个increment操作

        6.批量操作

        综上,涉及到数据更新的操作均是先检查MemStore,如果高于阈值,就会发送flush请求并抛出异常,阻塞操作。操作执行完毕后,也会根据MemStore增长情况,判断是否达到了触发flush的条件,进而决定是否发送flush请求。

        二、通过直接调用HRegion的flushcache()方法

        1.命令行等外部触发

        这种是通过RegionServer上RSRpcServices的flushRegion()方法发起的:

  
  @Override
  @QosPriority(priority=HConstants.ADMIN_QOS)
  public FlushRegionResponse flushRegion(final RpcController controller,
      final FlushRegionRequest request) throws ServiceException {
    try {
      checkOpen();
      
      requestCount.increment();
      
      HRegion region = getRegion(request.getRegion());
      LOG.info("Flushing " + region.getRegionNameAsString());
      boolean shouldFlush = true;

      if (request.hasIfOlderThanTs()) {
        shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
      }
      
      FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
      
      if (shouldFlush) {
        long startTime = EnvironmentEdgeManager.currentTime();
        HRegion.FlushResult flushResult = region.flushcache();
        if (flushResult.isFlushSucceeded()) {
          long endTime = EnvironmentEdgeManager.currentTime();
          regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
        }
        boolean result = flushResult.isCompactionNeeded();
        if (result) {
          regionServer.compactSplitThread.requestSystemCompaction(region,
            "Compaction through user triggered flush");
        }
        builder.setFlushed(result);
      }
      builder.setLastFlushTime(region.getLastFlushTime());
      return builder.build();
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server.
      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
      throw new ServiceException(ex);
    } catch (IOException ie) {
      throw new ServiceException(ie);
    }
  }

        该方法在判断应该进行flush后,会调用HRegion的flushcache()方法对其Memstore进行flush处理。

        2、Region合并

        在RSRpcServices中存在Region合并时调用的mergeRegions()方法,在其内部会先后调用regionA和regionB的flushcache()方法去flush每个Region上的MemStore,然后再执行Region合并:

      long startTime = EnvironmentEdgeManager.currentTime();
      
      HRegion.FlushResult flushResult = regionA.flushcache();
      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
      startTime = EnvironmentEdgeManager.currentTime();
      
      flushResult = regionB.flushcache();
      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
      regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
          masterSystemTime);

        3、Region分裂

        在RSRpcServices中存在Region分裂时调用的splitRegion()方法,也是先调用flushcache()将Region上的memstore刷新:

  @Override
  @QosPriority(priority=HConstants.ADMIN_QOS)
  public SplitRegionResponse splitRegion(final RpcController controller,
      final SplitRegionRequest request) throws ServiceException {
    try {
      
      checkOpen();
      requestCount.increment();
      HRegion region = getRegion(request.getRegion());
      region.startRegionOperation(Operation.SPLIT_REGION);
      LOG.info("Splitting " + region.getRegionNameAsString());
      
      long startTime = EnvironmentEdgeManager.currentTime();
      
      HRegion.FlushResult flushResult = region.flushcache();
      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
      byte[] splitPoint = null;
      if (request.hasSplitPoint()) {
        splitPoint = request.getSplitPoint().toByteArray();
      }

      region.forceSplit(splitPoint);

      regionServer.compactSplitThread.requestSplit(region, region.checkSplit());
      return SplitRegionResponse.newBuilder().build();
    } catch (DroppedSnapshotException ex) {
      regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
      throw new ServiceException(ex);
    } catch (IOException ie) {
      throw new ServiceException(ie);
    }
  }

        4、利用Bulk加载HFile

        Bulk是Hbase直接加载HFile存储数据的一种高速、实用的手段。在HRegion中的bulkLoadHFiles()方法中,也会调用flushcache()方法刷新HRegion上的MemStore内存:

if (assignSeqId) {
        FlushResult fs = this.flushcache();
        if (fs.isFlushSucceeded()) {
          seqId = fs.flushSequenceId;
        } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
          seqId = fs.flushSequenceId;
        } else {
          throw new IOException("Could not bulk load with an assigned sequential ID because the " +
              "flush didn't run. Reason for not flushing: " + fs.failureReason);
        }
      }

        5、做Table的快照

        在做表的快照时,会将对应Table中涉及到的Region的MemStore进行flush,做这项工作的是FlushSnapshotSubprocedure类中的RegionSnapshotTask,它在call()方法中,当snapshotSkipFlush为false时,会调用HRegion的flushcache()方法,对MemStore进行flush:

 if (snapshotSkipFlush) {
        
          LOG.debug("take snapshot without flush memstore first");
        } else {
          LOG.debug("Flush Snapshotting region " + region.toString() + " started...");
          region.flushcache();
        }

        至此,memStore的flush发起时机、判断条件等内容基本就这些了,如有错误,欢迎指正。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457686.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号