2021SC@SDUSC
目录
一、前言
二、Hbase在这方面是如何实现的
一、前言
问题背景:
客户端发送RPC请求给服务端时,因为各种原因,服务器相应可能超时。如果客户端盲目等待,针对数据的操作,很有可能服务器端已经处理完毕但是没法办法通知客户端,此时客户端只能再次发送请求,可是有可能造成服务器重复处理请求的问题。
解决方案:
客户端发送RPC请求时,如果响应超时会重复发送直至达到重试次数上限。另外客户端第一次发送和重发请求时会附带相同的nonce,服务端只需要根据nonce进行判断就能知道是否是同一请求,并根据之前请求处理的结果进行判断,决定是等待、拒绝还是处理。
二、Hbase在这方面是如何实现的
在HRegionServer中,有一个ServerNonceManager类型的成员变量nonceManager,由他负责管理该regionServer上的nonce:
final ServerNonceManager nonceManager;
serverNonceManager中有一个方法用于当一个操作在服务端执行后未及时响应给客户端,客户端重新发送携带相同nonceGroup和nonce的同一操作的请求时,服务器根据nonceGroup和nonce做相应的判断:
public boolean startOperation(long group, long nonce, Stoppable stoppable)
throws InterruptedException {
if (nonce == HConstants.NO_NONCE) return true;
NonceKey nk = new NonceKey(group, nonce);
OperationContext ctx = new OperationContext();
while (true) {
OperationContext oldResult = nonces.putIfAbsent(nk, ctx);
if (oldResult == null) return true;
// Collision with some operation - should be extremely rare.
synchronized (oldResult) {
int oldState = oldResult.getState();
LOG.debug("Conflict detected by nonce: " + nk + ", " + oldResult);
if (oldState != OperationContext.WAIT) {
return oldState == OperationContext.PROCEED; // operation ended
}
oldResult.setHasWait();
oldResult.wait(this.conflictWaitIterationMs); // operation is still active... wait and loop
if (stoppable.isStopped()) {
throw new InterruptedException("Server stopped");
}
}
}
}
处理逻辑如下:
- 首先判断传入的nonce是否为HConstants.NO_NONCE,是则直接返回true,表示操作可以进行
- 构建nonceKey实例nk和operationContext实例ctx
- 循环中,将nonceKey到operationContext的映射,添加到ConcurrentHashMap类型的nonces中去
- 如果之前没有,说明该操作可以直接执行
- 如果之前存在该操作,则获得该操作nonce的OperationContext状态如果状态不是WAIT
- 如果之前的状态时PROCEED,说明之前的操作执行完成且失败了,此处返回true,表示操作可以执行
- 等待一定时间后继续循环
在RSRpcServices的append()方法中,有如下代码:
if (r == null) {
long nonce = startNonceOperation(m, nonceGroup);
boolean success = false;
try {
r = region.append(append, nonceGroup, nonce);
success = true;
} finally {
endNonceOperation(m, nonceGroup, success);
}
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postAppend(append, r);
}
}
其中startNonceOperation()方法代码如下:
private long startNonceOperation(final MutationProto mutation, long nonceGroup)
throws IOException, OperationConflictException {
if (regionServer.nonceManager == null || !mutation.hasNonce()) return HConstants.NO_NONCE;
boolean canProceed = false;
try {
canProceed = regionServer.nonceManager.startOperation(
nonceGroup, mutation.getNonce(), regionServer);
} catch (InterruptedException ex) {
throw new InterruptedIOException("Nonce start operation interrupted");
}
if (!canProceed) {
// TODO: instead, we could convert append/increment to get w/mvcc
String message = "The operation with nonce {" + nonceGroup + ", " + mutation.getNonce()
+ "} on row [" + Bytes.toString(mutation.getRow().toByteArray())
+ "] may have already completed";
throw new OperationConflictException(message);
}
return mutation.getNonce();
}
处理逻辑如下:
- 如果RegionServer上的nonceManager为null,或者该nutation不存在nonce,直接返回HConstants
- 设置标志位,表示是否可以运行
- 调用RegionServer上nonceManager的startOperation()方法,确定是否可以执行该操作
- 如果不能运行,抛出OperationConflictException异常
- 返回mutation的nonce
如步骤三,此方法会调用RegionServer上nonceManaget的startOperation()方案,确定是否可以执行该操作。



