栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

2021SC@SDUSC Hbase(十三)项目代码分析-Nonce

2021SC@SDUSC Hbase(十三)项目代码分析-Nonce

2021SC@SDUSC

目录

一、前言

二、Hbase在这方面是如何实现的


一、前言

        问题背景:

                客户端发送RPC请求给服务端时,因为各种原因,服务器相应可能超时。如果客户端盲目等待,针对数据的操作,很有可能服务器端已经处理完毕但是没法办法通知客户端,此时客户端只能再次发送请求,可是有可能造成服务器重复处理请求的问题。

        解决方案:

                客户端发送RPC请求时,如果响应超时会重复发送直至达到重试次数上限。另外客户端第一次发送和重发请求时会附带相同的nonce,服务端只需要根据nonce进行判断就能知道是否是同一请求,并根据之前请求处理的结果进行判断,决定是等待、拒绝还是处理。

二、Hbase在这方面是如何实现的

        在HRegionServer中,有一个ServerNonceManager类型的成员变量nonceManager,由他负责管理该regionServer上的nonce:

final ServerNonceManager nonceManager;

serverNonceManager中有一个方法用于当一个操作在服务端执行后未及时响应给客户端,客户端重新发送携带相同nonceGroup和nonce的同一操作的请求时,服务器根据nonceGroup和nonce做相应的判断:

  public boolean startOperation(long group, long nonce, Stoppable stoppable)
      throws InterruptedException {
    if (nonce == HConstants.NO_NONCE) return true;
    NonceKey nk = new NonceKey(group, nonce);
    OperationContext ctx = new OperationContext();
    while (true) {
      OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
      if (oldResult == null) return true;
 
      // Collision with some operation - should be extremely rare.
      synchronized (oldResult) {
        int oldState = oldResult.getState();
        LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
        if (oldState != OperationContext.WAIT) {
          return oldState == OperationContext.PROCEED; // operation ended
        }
        oldResult.setHasWait();
        oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop
        
        if (stoppable.isStopped()) {
          throw new InterruptedException("Server stopped");
        }
      }
    }
  }

处理逻辑如下:

  1. 首先判断传入的nonce是否为HConstants.NO_NONCE,是则直接返回true,表示操作可以进行
  2. 构建nonceKey实例nk和operationContext实例ctx
  3. 循环中,将nonceKey到operationContext的映射,添加到ConcurrentHashMap类型的nonces中去
  4. 如果之前没有,说明该操作可以直接执行
  5. 如果之前存在该操作,则获得该操作nonce的OperationContext状态如果状态不是WAIT
    1. 如果之前的状态时PROCEED,说明之前的操作执行完成且失败了,此处返回true,表示操作可以执行
    2. 等待一定时间后继续循环

  在RSRpcServices的append()方法中,有如下代码:

if (r == null) {
      long nonce = startNonceOperation(m, nonceGroup);
      boolean success = false;
      try {
        r = region.append(append, nonceGroup, nonce);
        success = true;
      } finally {
        endNonceOperation(m, nonceGroup, success);
      }
      if (region.getCoprocessorHost() != null) {
        region.getCoprocessorHost().postAppend(append, r);
      }
    }

其中startNonceOperation()方法代码如下:

  private long startNonceOperation(final MutationProto mutation, long nonceGroup)
      throws IOException, OperationConflictException {

    if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
    boolean canProceed = false;
    try {
      canProceed = regionServer.nonceManager.startOperation(
        nonceGroup, mutation.getNonce(), regionServer);
    } catch (InterruptedException ex) {
      throw new InterruptedIOException("Nonce start operation interrupted");
    }
    
    if (!canProceed) {
      // TODO: instead, we could convert append/increment to get w/mvcc
      String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
        + "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
        + "] may have already completed";
      throw new OperationConflictException(message);
    }
    
    return mutation.getNonce();
  }

处理逻辑如下:

  1. 如果RegionServer上的nonceManager为null,或者该nutation不存在nonce,直接返回HConstants
  2. 设置标志位,表示是否可以运行
  3. 调用RegionServer上nonceManager的startOperation()方法,确定是否可以执行该操作
  4. 如果不能运行,抛出OperationConflictException异常
  5. 返回mutation的nonce

如步骤三,此方法会调用RegionServer上nonceManaget的startOperation()方案,确定是否可以执行该操作。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/679938.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号