栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

zookeeper源码解析--请求处理--CommitProcessor

zookeeper源码解析--请求处理--CommitProcessor

CommitProcessor
int requestsToProcess = 0;
boolean commitIsWaiting = false;
do 
{
	// 如果有请求对象等待提交则为true,否则为false
    commitIsWaiting = !committedRequests.isEmpty();
    // 除了等待提交的请求,还有处于排队的请求
    requestsToProcess = queuedRequests.size();
    // 排队请求为0,等待提交请求为0
    if (requestsToProcess == 0 && !commitIsWaiting) 
    {
        synchronized (this) 
        {
            while (!stopped && requestsToProcess == 0 && !commitIsWaiting) 
            {
            	// 持续等待
                wait();
                commitIsWaiting = !committedRequests.isEmpty();
                requestsToProcess = queuedRequests.size();
            }
        }
    }

    // 到这里,要么有请求排队等待,要么有请求等待提交
    ServerMetrics.getMetrics().
    	READS_QUEUED_IN_COMMIT_PROCESSOR.add(numReadQueuedRequests.get());
    ServerMetrics.getMetrics().
    	WRITES_QUEUED_IN_COMMIT_PROCESSOR.add(numWriteQueuedRequests.get());
    ServerMetrics.getMetrics().
    	COMMITS_QUEUED_IN_COMMIT_PROCESSOR.add(committedRequests.size());
    // 记录此时时间
    long time = Time.currentElapsedTime();
    // 请求对象
    Request request;
    int readsProcessed = 0;
   	
   	// 如果 排队请求大于0 
   	// 且 一次性读不限制,或未达到限制
   	// 且有请求在排队
    while (!stopped 
    	&& requestsToProcess > 0 
    	&& (maxReadBatchSize < 0 || readsProcessed <= maxReadBatchSize) 
    	&& (request = queuedRequests.poll()) != null) 
    {
    	// 排队等待请求个数减1
        requestsToProcess--;
        // 如果请求需要提交
        // 或者请求所属会话已有处于等待的请求
        if (needCommit(request) || pendingRequests.containsKey(request.sessionId)) 
        {
        	// 会话下阻塞等待请求集合
            Deque requests 
            	= pendingRequests.computeIfAbsent(request.sessionId, sid -> new ArrayDeque<>());
            // 会话下阻塞等待的请求集合加入取出请求
            requests.addLast(request);
            ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(requests.size());
        } 
        else 
        {
            // 请求不需提交,且请求所属会话无阻塞等待请求
            // 更新处理的读请求个数
            readsProcessed++;
            // 更新排队读请求个数
            numReadQueuedRequests.decrementAndGet();
            // 读请求直接交给下一级处理
            sendTonextProcessor(request);
        }
        
        // 如果 不限制处理读个书
        // 且 仍有某些会话下有阻塞等待请求
        // 且 等待提交处理请求存在
        if (maxReadBatchSize < 0 && !pendingRequests.isEmpty() && !committedRequests.isEmpty()) 
        {
        	// 有请求在等待提交
            commitIsWaiting = true;
            // 跳出
            break;
        }
    }

    ServerMetrics.getMetrics().READS_ISSUED_IN_COMMIT_PROC.add(readsProcessed);
    // 无请求在等待提交
    if (!commitIsWaiting) 
    {
    	// 重新确定是否有请求在等待提交
        commitIsWaiting = !committedRequests.isEmpty();
    }
	
	// 有请求在等待提交
    if (commitIsWaiting && !stopped) 
    {
        waitForEmptyPool();
        	// 处理中请求个书
        	int numRequestsInProcess = numRequestsProcessing.get();
        	// 有请求在处理中
	        if (numRequestsInProcess != 0) 
	        {
	            ServerMetrics.getMetrics().
	            	CONCURRENT_REQUEST_PROCESSING_IN_COMMIT_PROCESSOR.
	            	add(numRequestsInProcess);
	        }
			// 此时时间
	        long startWaitTime = Time.currentElapsedTime();
	        synchronized (emptyPoolSync) 
	        {
	        	// 有请求在处理中
	            while ((!stopped) && isProcessingRequest()) 
	            {
	            	// 阻塞等待
	                emptyPoolSync.wait();
	            }
	        }
	
	        ServerMetrics.getMetrics().
	        	TIME_WAITING_EMPTY_POOL_IN_COMMIT_PROCESSOR_READ.
	        	add(Time.currentElapsedTime() - startWaitTime);
        if (stopped) 
        {
            return;
        }
        
        // 最大提交请求个数
        int commitsToProcess = maxCommitBatchSize;
        Set queuesToDrain = new HashSet<>();
        // 当前时间
        long startWriteTime = Time.currentElapsedTime();
        int commitsProcessed = 0;
        // 有请求在等待提交
        // 且 剩余可提交请求大于0
        while (commitIsWaiting && !stopped && commitsToProcess > 0) 
        {
         	// 取出一个待提交请求
            request = committedRequests.peek();
            if (request.isThrottled()) 
            {
                LOG.error("Throttled request in committed pool: {}. Exiting.", request);
                ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
            }

           // 排队等待写请求存在
           // 排在首个写请求和取出的提交请求的会话id,cxid均一致
            if (!queuedWriteRequests.isEmpty() 
            	&& queuedWriteRequests.peek().sessionId == request.sessionId 
            	&& queuedWriteRequests.peek().cxid == request.cxid) 
            {
            	// 取出此会话id下所有排队等待请求
                Deque sessionQueue = pendingRequests.get(request.sessionId);
                ServerMetrics.getMetrics().PENDING_SESSION_QUEUE_SIZE.add(pendingRequests.size());
                // 此会话下没有排队等待请求
                // 或排在首个的请求不需要提交
                if (sessionQueue == null || sessionQueue.isEmpty() || !needCommit(sessionQueue.peek())) 
                {
                	// 跳出
                    break;
                } 
                else 
                {
                    ServerMetrics.getMetrics().REQUESTS_IN_SESSION_QUEUE.add(sessionQueue.size());
                    // 取出会话id下排在首个的请求
                    Request topPending = sessionQueue.poll();
                    // 以取出的提交请求设置排队中请求事务信息
                    // 为排在首个的请求设置事务头
                    topPending.setHdr(request.getHdr());
                    // 为排在首个的请求设置事务体
                    topPending.setTxn(request.getTxn());
                    // 为排在首个的请求设置事务摘要
                    topPending.setTxnDigest(request.getTxnDigest());
                    // 设置排在首个的请求的zxid
                    topPending.zxid = request.zxid;
                    // 设置排在首个的请求的提交接收时间
                    topPending.commitRecvTime = request.commitRecvTime;
                    // 设置request为排在首个请求
                    request = topPending;
                    if (request.isThrottled()) 
                    {
                        LOG.error("Throttled request in committed & pending pool: {}. Exiting.", request);
                        ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
                    }
                    
                    // 更新排队写请求个数
                    numWriteQueuedRequests.decrementAndGet();
                    // 删除首个排队写请求
                    queuedWriteRequests.poll();
                    // 将提交请求的会话id加入queuesToDrain
                    queuesToDrain.add(request.sessionId);
                }
            }

			// 移除首个待提交请求
            committedRequests.remove();
            // 待提交请求个数减1
            commitsToProcess--;
            // 提交处理个数加1
            commitsProcessed++;
            // 对提交请求进行processWrite
            processWrite(request);
            	processCommitMetrics(request, true);
            		// 是写请求
            		if (isWrite) 
			        {
			            if (request.commitProcQueueStartTime != -1 && request.commitRecvTime != -1) 
			            {
			            	// 当前时间
			                long currentTime = Time.currentElapsedTime();
			                ServerMetrics.getMetrics().
			                	WRITE_COMMITPROC_TIME.
			                	add(currentTime - request.commitProcQueueStartTime);
			                ServerMetrics.getMetrics().LOCAL_WRITE_COMMITTED_TIME.
			                	add(currentTime - request.commitRecvTime);
			            } 
			            else if (request.commitRecvTime != -1) 
			            {
			                ServerMetrics.getMetrics().SERVER_WRITE_COMMITTED_TIME.
			                	add(Time.currentElapsedTime() - request.commitRecvTime);
			            }
			        } 
			        // 读请求
			        else 
			        {
			            if (request.commitProcQueueStartTime != -1) 
			            {
			                ServerMetrics.getMetrics().READ_COMMITPROC_TIME.
			                	add(Time.currentElapsedTime() - request.commitProcQueueStartTime);
			            }
			        }
			    // 当前时间
		        long timeBeforeFinalProc = Time.currentElapsedTime();
		        // 将请求提交给下一级处理
		        nextProcessor.processRequest(request);
		        ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.
		        	add(Time.currentElapsedTime() - timeBeforeFinalProc);
            // 更新是否有请求等待提交
            commitIsWaiting = !committedRequests.isEmpty();
        }

        ServerMetrics.getMetrics().
        	WRITE_BATCH_TIME_IN_COMMIT_PROCESSOR.
        	add(Time.currentElapsedTime() - startWriteTime);
        ServerMetrics.getMetrics().
        	WRITES_ISSUED_IN_COMMIT_PROC.add(commitsProcessed);
        // 已经处理的读请求个数
        readsProcessed = 0;
        // 对执行了请求提交的每个会话依次处理
        for (Long sessionId : queuesToDrain) 
        {
        	// 获取此会话下所有阻塞等待请求
            Deque sessionQueue = pendingRequests.get(sessionId);
            // 写后读个数
            int readsAfterWrite = 0;
            // 如果 排队阻塞请求存在
            // 且 首个排队请求不需提交
            while (!stopped && !sessionQueue.isEmpty() && !needCommit(sessionQueue.peek())) 
            {
            	// 更新等待读请求个数
                numReadQueuedRequests.decrementAndGet();
                // 取出首个排队的读请求,执行sendTonextProcessor
                sendTonextProcessor(sessionQueue.poll());
                	// 更新处理中请求个数
                	numRequestsProcessing.incrementAndGet();
			        CommitWorkRequest workRequest = new CommitWorkRequest(request);
			        	public void doWork() throws RequestProcessorException 
				        {
				            try 
				            {
				            	// 更新统计
				                processCommitMetrics(request, needCommit(request));
				                // 当前时间点
				                long timeBeforeFinalProc = Time.currentElapsedTime();
				                // 交给下一级处理
				                nextProcessor.processRequest(request);
				                if (needCommit(request)) 
				                {
				                    ServerMetrics.getMetrics().WRITE_FINAL_PROC_TIME.
				                    	add(Time.currentElapsedTime() - timeBeforeFinalProc);
				                } 
				                else 
				                {
				                    ServerMetrics.getMetrics().READ_FINAL_PROC_TIME.
				                    	add(Time.currentElapsedTime() - timeBeforeFinalProc);
				                }
				            } 
				            finally 
				            {
				            	// 更新处理中请求个数
				                if (numRequestsProcessing.decrementAndGet() == 0) 
				                {
				                	// 变为0时,执行唤醒
				                    wakeuponEmpty();
				                }
				            }
				        }
			        workerPool.schedule(workRequest, request.sessionId);
                // 更新写后读个数
                readsAfterWrite++;
            }
            
            ServerMetrics.getMetrics().READS_AFTER_WRITE_IN_SESSION_QUEUE.add(readsAfterWrite);
            // 累计所有会话的写后读个数
            readsProcessed += readsAfterWrite;
            // 如果当前处理会话没有等待的请求了
            if (sessionQueue.isEmpty()) 
            {
            	// 从pendingRequests中移除此会话关联项
                pendingRequests.remove(sessionId);
            }
        }

        ServerMetrics.getMetrics().SESSION_QUEUES_DRAINED.add(queuesToDrain.size());
        ServerMetrics.getMetrics().READ_ISSUED_FROM_SESSION_QUEUE.add(readsProcessed);
    }

    ServerMetrics.getMetrics().COMMIT_PROCESS_TIME.add(Time.currentElapsedTime() - time);
    endOfIteration();
} while (!stoppedMainLoop);
请求来源–commit

所有集群模式下来自自身客户端&从节点的写请求,
先构造提议,
发出提议,
收集ack,
过半确认后,通过commit进入这里

public void commit(Request request) 
{
    if (stopped || request == null) 
    {
        return;
    }
    
    LOG.debug("Committing request:: {}", request);
    // 请求的提交接收时间
    request.commitRecvTime = Time.currentElapsedTime();
    ServerMetrics.getMetrics().COMMITS_QUEUED.add(1);
    // committedRequests中加入此请求
    committedRequests.add(request);
    // 唤醒线程
    wakeup();
}
请求来源–processRequest

集群模式来自自身客户端&从节点发来的读请求,
在同步处理阶段处理后,
可以直达提交阶段,通过processRequest进入

public void processRequest(Request request) 
{
    if (stopped) 
    {
        return;
    }
    
    LOG.debug("Processing request:: {}", request);
    // 请求开始排队提交时间
    request.commitProcQueueStartTime = Time.currentElapsedTime();
    // 排队等待提交请求
    queuedRequests.add(request);
    // 如果直达的请求需要提交--似乎没必要
    if (needCommit(request)) 
    {
    	// queuedWriteRequests中加入此请求
        queuedWriteRequests.add(request);
        // 更新排队写请求个数
        numWriteQueuedRequests.incrementAndGet();
    } 
    else 
    {
    	// 更新排队读请求个数
        numReadQueuedRequests.incrementAndGet();
    }
    
    // 唤醒
    wakeup();
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/710286.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号