int requestsToProcess = 0;
boolean commitIsWaiting = false;
do
{
// 如果有请求对象等待提交则为true,否则为false
commitIsWaiting = !committedRequests.isEmpty();
// 除了等待提交的请求,还有处于排队的请求
requestsToProcess = queuedRequests.size();
// 排队请求为0,等待提交请求为0
if (requestsToProcess == 0 && !commitIsWaiting)
{
synchronized (this)
{
while (!stopped && requestsToProcess == 0 && !commitIsWaiting)
{
// 持续等待
wait();
commitIsWaiting = !committedRequests.isEmpty();
requestsToProcess = queuedRequests.size();
}
}
}
// 到这里,要么有请求排队等待,要么有请求等待提交
ServerMetrics.getMetrics().
READS_QUEUED_IN_COMMIT_PROCESSOR.add(numReadQueuedRequests.get());
ServerMetrics.getMetrics().
WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(numWriteQueuedRequests.get());
ServerMetrics.getMetrics().
COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(committedRequests.size());
// 记录此时时间
long time = Time.currentElapsedTime();
// 请求对象
Request request;
int readsProcessed = 0;
// 如果 排队请求大于0
// 且 一次性读不限制,或未达到限制
// 且有请求在排队
while (!stopped
&& requestsToProcess > 0
&& (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize)
&& (request = queuedRequests.poll()) != null)
{
// 排队等待请求个数减1
requestsToProcess--;
// 如果请求需要提交
// 或者请求所属会话已有处于等待的请求
if (needCommit(request) || pendingRequests.containsKey(request.sessionId))
{
// 会话下阻塞等待请求集合
Deque requests
= pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
// 会话下阻塞等待的请求集合加入取出请求
requests.addLast(request);
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
}
else
{
// 请求不需提交,且请求所属会话无阻塞等待请求
// 更新处理的读请求个数
readsProcessed++;
// 更新排队读请求个数
numReadQueuedRequests.decrementAndGet();
// 读请求直接交给下一级处理
sendTonextProcessor(request);
}
// 如果 不限制处理读个书
// 且 仍有某些会话下有阻塞等待请求
// 且 等待提交处理请求存在
if (maxReadBatchSize < 0 && !pendingRequests.isEmpty() && !committedRequests.isEmpty())
{
// 有请求在等待提交
commitIsWaiting = true;
// 跳出
break;
}
}
ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed);
// 无请求在等待提交
if (!commitIsWaiting)
{
// 重新确定是否有请求在等待提交
commitIsWaiting = !committedRequests.isEmpty();
}
// 有请求在等待提交
if (commitIsWaiting && !stopped)
{
waitForEmptyPool();
// 处理中请求个书
int numRequestsInProcess = numRequestsProcessing.get();
// 有请求在处理中
if (numRequestsInProcess != 0)
{
ServerMetrics.getMetrics().
CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR.
add(numRequestsInProcess);
}
// 此时时间
long startWaitTime = Time.currentElapsedTime();
synchronized (emptyPoolSync)
{
// 有请求在处理中
while ((!stopped) && isProcessingRequest())
{
// 阻塞等待
emptyPoolSync.wait();
}
}
ServerMetrics.getMetrics().
TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ.
add(Time.currentElapsedTime() - startWaitTime);
if (stopped)
{
return;
}
// 最大提交请求个数
int commitsToProcess = maxCommitBatchSize;
Set queuesToDrain = new HashSet<>();
// 当前时间
long startWriteTime = Time.currentElapsedTime();
int commitsProcessed = 0;
// 有请求在等待提交
// 且 剩余可提交请求大于0
while (commitIsWaiting && !stopped && commitsToProcess > 0)
{
// 取出一个待提交请求
request = committedRequests.peek();
if (request.isThrottled())
{
LOG.error("Throttled request in committed pool: {}. Exiting.", request);
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
// 排队等待写请求存在
// 排在首个写请求和取出的提交请求的会话id,cxid均一致
if (!queuedWriteRequests.isEmpty()
&& queuedWriteRequests.peek().sessionId == request.sessionId
&& queuedWriteRequests.peek().cxid == request.cxid)
{
// 取出此会话id下所有排队等待请求
Deque sessionQueue = pendingRequests.get(request.sessionId);
ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
// 此会话下没有排队等待请求
// 或排在首个的请求不需要提交
if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek()))
{
// 跳出
break;
}
else
{
ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
// 取出会话id下排在首个的请求
Request topPending = sessionQueue.poll();
// 以取出的提交请求设置排队中请求事务信息
// 为排在首个的请求设置事务头
topPending.setHdr(request.getHdr());
// 为排在首个的请求设置事务体
topPending.setTxn(request.getTxn());
// 为排在首个的请求设置事务摘要
topPending.setTxnDigest(request.getTxnDigest());
// 设置排在首个的请求的zxid
topPending.zxid = request.zxid;
// 设置排在首个的请求的提交接收时间
topPending.commitRecvTime = request.commitRecvTime;
// 设置request为排在首个请求
request = topPending;
if (request.isThrottled())
{
LOG.error("Throttled request in committed & pending pool: {}. Exiting.", request);
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
// 更新排队写请求个数
numWriteQueuedRequests.decrementAndGet();
// 删除首个排队写请求
queuedWriteRequests.poll();
// 将提交请求的会话id加入queuesToDrain
queuesToDrain.add(request.sessionId);
}
}
// 移除首个待提交请求
committedRequests.remove();
// 待提交请求个数减1
commitsToProcess--;
// 提交处理个数加1
commitsProcessed++;
// 对提交请求进行processWrite
processWrite(request);
processCommitMetrics(request, true);
// 是写请求
if (isWrite)
{
if (request.commitProcQueueStartTime != -1 && request.commitRecvTime != -1)
{
// 当前时间
long currentTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().
WRITE_COMMITPROC_TIME.
add(currentTime - request.commitProcQueueStartTime);
ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.
add(currentTime - request.commitRecvTime);
}
else if (request.commitRecvTime != -1)
{
ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.
add(Time.currentElapsedTime() - request.commitRecvTime);
}
}
// 读请求
else
{
if (request.commitProcQueueStartTime != -1)
{
ServerMetrics.getMetrics().READ_COMMITPROC_TIME.
add(Time.currentElapsedTime() - request.commitProcQueueStartTime);
}
}
// 当前时间
long timeBeforeFinalProc = Time.currentElapsedTime();
// 将请求提交给下一级处理
nextProcessor.processRequest(request);
ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.
add(Time.currentElapsedTime() - timeBeforeFinalProc);
// 更新是否有请求等待提交
commitIsWaiting = !committedRequests.isEmpty();
}
ServerMetrics.getMetrics().
WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.
add(Time.currentElapsedTime() - startWriteTime);
ServerMetrics.getMetrics().
WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed);
// 已经处理的读请求个数
readsProcessed = 0;
// 对执行了请求提交的每个会话依次处理
for (Long sessionId : queuesToDrain)
{
// 获取此会话下所有阻塞等待请求
Deque sessionQueue = pendingRequests.get(sessionId);
// 写后读个数
int readsAfterWrite = 0;
// 如果 排队阻塞请求存在
// 且 首个排队请求不需提交
while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek()))
{
// 更新等待读请求个数
numReadQueuedRequests.decrementAndGet();
// 取出首个排队的读请求,执行sendTonextProcessor
sendTonextProcessor(sessionQueue.poll());
// 更新处理中请求个数
numRequestsProcessing.incrementAndGet();
CommitWorkRequest workRequest = new CommitWorkRequest(request);
public void doWork() throws RequestProcessorException
{
try
{
// 更新统计
processCommitMetrics(request, needCommit(request));
// 当前时间点
long timeBeforeFinalProc = Time.currentElapsedTime();
// 交给下一级处理
nextProcessor.processRequest(request);
if (needCommit(request))
{
ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.
add(Time.currentElapsedTime() - timeBeforeFinalProc);
}
else
{
ServerMetrics.getMetrics().READ_FINAL_PROC_TIME.
add(Time.currentElapsedTime() - timeBeforeFinalProc);
}
}
finally
{
// 更新处理中请求个数
if (numRequestsProcessing.decrementAndGet() == 0)
{
// 变为0时,执行唤醒
wakeuponEmpty();
}
}
}
workerPool.schedule(workRequest, request.sessionId);
// 更新写后读个数
readsAfterWrite++;
}
ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
// 累计所有会话的写后读个数
readsProcessed += readsAfterWrite;
// 如果当前处理会话没有等待的请求了
if (sessionQueue.isEmpty())
{
// 从pendingRequests中移除此会话关联项
pendingRequests.remove(sessionId);
}
}
ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
}
ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
endOfIteration();
} while (!stoppedMainLoop);
请求来源–commit
所有集群模式下来自自身客户端&从节点的写请求,
先构造提议,
发出提议,
收集ack,
过半确认后,通过commit进入这里
public void commit(Request request)
{
if (stopped || request == null)
{
return;
}
LOG.debug("Committing request:: {}", request);
// 请求的提交接收时间
request.commitRecvTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
// committedRequests中加入此请求
committedRequests.add(request);
// 唤醒线程
wakeup();
}
请求来源–processRequest
集群模式来自自身客户端&从节点发来的读请求,
在同步处理阶段处理后,
可以直达提交阶段,通过processRequest进入
public void processRequest(Request request)
{
if (stopped)
{
return;
}
LOG.debug("Processing request:: {}", request);
// 请求开始排队提交时间
request.commitProcQueueStartTime = Time.currentElapsedTime();
// 排队等待提交请求
queuedRequests.add(request);
// 如果直达的请求需要提交--似乎没必要
if (needCommit(request))
{
// queuedWriteRequests中加入此请求
queuedWriteRequests.add(request);
// 更新排队写请求个数
numWriteQueuedRequests.incrementAndGet();
}
else
{
// 更新排队读请求个数
numReadQueuedRequests.incrementAndGet();
}
// 唤醒
wakeup();
}



