栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

zookeeper Zab协议—Follower和Leader状态同步-细节分析二

zookeeper Zab协议—Follower和Leader状态同步-细节分析二

目录

2021SC@SDUSC

一、Follower和Leader状态同步-代码细节分析

7.LearnerHandler.run()方法读数据,读follower传过来的packet信息,创建新的Epoch,发送给follower,并等待应答后,进行消息同步

8.follower接收到新的Epoch,并回复leader

Learner.registerWithLeader()

9.消息同步

(1)提交提案

(2)LearnerHandleader.queueCommittedProposals()将Leader.COMMIT发给follower

(3)follower接收Leader.COMMIT

(4)FollowerZooKeeperServer.commit() 向Leader回复

(5)发送同步数据

二、总结


2021SC@SDUSC

一、Follower和Leader状态同步-代码细节分析

7.LearnerHandler.run()方法读数据,读follower传过来的packet信息,创建新的Epoch,发送给follower,并等待应答后,进行消息同步
    public void run() {
        try {
            leader.addLearnerHandler(this);
            tickOfNextAckDeadline = leader.self.tick.get()
                    + leader.self.initLimit + leader.self.syncLimit;

            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);

            QuorumPacket qp = new QuorumPacket();
            
//读取follower传过来的packet信息
            ia.readRecord(qp, "packet");


            if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != 
                Leader.OBSERVERINFO){
                LOG.error("First packet " + qp.toString()
                        + " is not FOLLOWERINFO or OBSERVERINFO!");
                return;
            }

            byte learnerInfoData[] = qp.getData();
            if (learnerInfoData != null) {
                ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                if (learnerInfoData.length >= 8) {
                    this.sid = bbsid.getLong();
                }
                if (learnerInfoData.length >= 12) {
                    this.version = bbsid.getInt(); // protocolVersion
                }
                if (learnerInfoData.length >= 20) {
                    long configVersion = bbsid.getLong();
                    if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
                        throw new IOException("Follower is ahead of the leader (has a 
                later activated configuration)");
                    }
                }
            } else {
                this.sid = leader.followerCounter.getAndDecrement();
            }

            if (leader.self.getView().containsKey(this.sid)) {
                LOG.info("Follower sid: " + this.sid + " : info : "
                        + leader.self.getView().get(this.sid).toString());
            } else {
                LOG.info("Follower sid: " + this.sid + " not in the current config " +                     
                Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
            }
                        
            if (qp.getType() == Leader.OBSERVERINFO) {
                  learnerType = LearnerType.OBSERVER;
            }

            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());

            long peerLastZxid;
            StateSummary ss = null;
            long zxid = qp.getZxid();

//创建新的Epoch
            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);

           if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                leader.waitForEpochAck(this.getSid(), ss);
            } else {
                byte ver[] = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                

//将新的Epoch发送给follower
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
                oa.writeRecord(newEpochPacket, "packet");
                bufferedOutput.flush();
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error(ackEpochPacket.toString()
                            + " is not ACKEPOCH");
                    return;
				}
                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
//等待应答
                leader.waitForEpochAck(this.getSid(), ss);
            }
            peerLastZxid = ss.getLastZxid();
           
//判断是否需要同步
            boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);


//需要,则在接收到follower对提案的应答之后发送数据给follower
            
            if (needSnap) {
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                LearnerSnapshot snapshot = 
                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                try {
                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    bufferedOutput.flush();


 

8.follower接收到新的Epoch,并回复leader

Learner.registerWithLeader()
        writePacket(qp, true);
//接收新的Epoch 
        readPacket(qp);    
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
		if (qp.getType() == Leader.LEADERINFO) {
        	// we are connected to a 1.0 server so accept the new epoch and read the next         
               packet
        	leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
        	byte epochBytes[] = new byte[4];
        	final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
        	if (newEpoch > self.getAcceptedEpoch()) {
        		wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
        		self.setAcceptedEpoch(newEpoch);
        	} else if (newEpoch == self.getAcceptedEpoch()) {
        		// since we have already acked an epoch equal to the leaders, we cannot         
                  ack
        		// again, but we still need to send our lastZxid to the leader so that we 
                   can
        		// sync with it if it does assume leadership of the epoch.
        		// the -1 indicates that this reply should not count as an ack for the 
                  new epoch
                wrappedEpochBytes.putInt(-1);
        	} else {
        		throw new IOException("Leaders epoch, " + newEpoch + " is less than 
               accepted epoch, " + self.getAcceptedEpoch());
        	}

//向leader应答
        	QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, 
            epochBytes, null);
        	writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);
        } else {
        	if (newEpoch > self.getAcceptedEpoch()) {
        		self.setAcceptedEpoch(newEpoch);
        	}
            if (qp.getType() != Leader.NEWLEADER) {
                LOG.error("First packet should have been NEWLEADER");
                throw new IOException("First packet should have been NEWLEADER");
            }
            return qp.getZxid();
        }

9.消息同步

(1)提交提案
           

            if (forceSnapSync) {
                // Force leader to use snapshot to sync with follower
                LOG.warn("Forcing snapshot sync - should not see this in production");
            } else if (lastProcessedZxid == peerLastZxid) {
                // Follower is already sync with us, send empty diff
                LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
                         " for peer sid: " +  getSid());
                queueOpPacket(Leader.DIFF, peerLastZxid);
                needOpPacket = false;
                needSnap = false;
            } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
                // Newer than committedLog, send trunc and done
                LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
                          Long.toHexString(maxCommittedLog) +
                          " for peer sid:" +  getSid());
                queueOpPacket(Leader.TRUNC, maxCommittedLog);
                currentZxid = maxCommittedLog;
                needOpPacket = false;
                needSnap = false;
            } else if ((maxCommittedLog >= peerLastZxid)
                    && (minCommittedLog <= peerLastZxid)) {
                // Follower is within commitLog range
                LOG.info("Using committedLog for peer sid: " +  getSid());
                Iterator itr = db.getCommittedLog().iterator();
//队列提交提案
                currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                                     null, maxCommittedLog);
                needSnap = false;
            } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
                // Use txnlog and committedLog to sync

                // Calculate sizeLimit that we allow to retrieve txnlog from disk
                long sizeLimit = db.calculateTxnLogSizeLimit();
                // This method can return empty iterator if the requested zxid
                // is older than on-disk txnlog
                Iterator txnLogItr = db.getProposalsFromTxnLog(
                        peerLastZxid, sizeLimit);
                if (txnLogItr.hasNext()) {
                    LOG.info("Use txnlog and committedLog for peer sid: " +  getSid());
                    currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                                         minCommittedLog, maxCommittedLog);

                    LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
                    Iterator committedLogItr = db.getCommittedLog().iterator();
                    currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                                         null, maxCommittedLog);
                    needSnap = false;
                }
                // closing the resources
                if (txnLogItr instanceof TxnLogProposalIterator) {
                    TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
                    txnProposalItr.close();
                }
            } else {
                LOG.warn("Unhandled scenario for peer sid: " +  getSid());
            }
            LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
                      " for peer sid: " +  getSid());
            leaderLastZxid = leader.startForwarding(this, currentZxid);
        } finally {
            rl.unlock();
        }

        if (needOpPacket && !needSnap) {
            // This should never happen, but we should fall back to sending
            // snapshot just in case.
            LOG.error("Unhandled scenario for peer sid: " +  getSid() +
                     " fall back to use snapshot");
            needSnap = true;
        }

        return needSnap;
    }

(2)LearnerHandleader.queueCommittedProposals()将Leader.COMMIT发给follower
   protected long queueCommittedProposals(Iterator itr,
            long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
        boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
        long queuedZxid = peerLastZxid;
        // as we look through proposals, this variable keeps track of previous
        // proposal Id.
        long prevProposalZxid = -1;
        while (itr.hasNext()) {
            Proposal propose = itr.next();

            long packetZxid = propose.packet.getZxid();
            // abort if we hit the limit
            if ((maxZxid != null) && (packetZxid > maxZxid)) {
                break;
            }

            // skip the proposals the peer already has
            if (packetZxid < peerLastZxid) {
                prevProposalZxid = packetZxid;
                continue;
            }

            // If we are sending the first packet, figure out whether to trunc
            // or diff
            if (needOpPacket) {

                // Send diff when we see the follower's zxid in our history
                if (packetZxid == peerLastZxid) {
                    LOG.info("Sending DIFF zxid=0x" +
                             Long.toHexString(lastCommittedZxid) +
                             " for peer sid: " + getSid());
                    queueOpPacket(Leader.DIFF, lastCommittedZxid);
                    needOpPacket = false;
                    continue;
                }

                if (isPeerNewEpochZxid) {
                   // Send diff and fall through if zxid is of a new-epoch
                   LOG.info("Sending DIFF zxid=0x" +
                            Long.toHexString(lastCommittedZxid) +
                            " for peer sid: " + getSid());
                   queueOpPacket(Leader.DIFF, lastCommittedZxid);
                   needOpPacket = false;
                } else if (packetZxid > peerLastZxid  ) {
                    // Peer have some proposals that the leader hasn't seen yet
                    // it may used to be a leader
                    if (ZxidUtils.getEpochFromZxid(packetZxid) !=
                            ZxidUtils.getEpochFromZxid(peerLastZxid)) {
                        // We cannot send TRUNC that cross epoch boundary.
                        // The learner will crash if it is asked to do so.
                        // We will send snapshot this those cases.
                        LOG.warn("Cannot send TRUNC to peer sid: " + getSid() +
                                 " peer zxid is from different epoch" );
                        return queuedZxid;
                    }

                    LOG.info("Sending TRUNC zxid=0x" +
                            Long.toHexString(prevProposalZxid) +
                            " for peer sid: " + getSid());
                    queueOpPacket(Leader.TRUNC, prevProposalZxid);
                    needOpPacket = false;
                }
            }

            if (packetZxid <= queuedZxid) {
                // We can get here, if we don't have op packet to queue
                // or there is a duplicate txn in a given iterator
                continue;
            }

            // Since this is already a committed proposal, we need to follow
            // it by a commit packet
            queuePacket(propose.packet);
            queueOpPacket(Leader.COMMIT, packetZxid);
            queuedZxid = packetZxid;

(3)follower接收Leader.COMMIT
    protected void processPacket(QuorumPacket qp) throws Exception{
        switch (qp.getType()) {
        case Leader.PING:            
            ping(qp);            
            break;
        case Leader.PROPOSAL:           
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            
            if (hdr.getType() == OpCode.reconfig){
               SetDataTxn setDataTxn = (SetDataTxn) txn;       
               QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
               self.setLastSeenQuorumVerifier(qv, true);                               
            }
            
            fzk.logRequest(hdr, txn);
            break;

//接收Leader.COMMIT
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break;
 

(4)FollowerZooKeeperServer.commit() 向Leader回复
    public void commit(long zxid) {
        if (pendingTxns.size() == 0) {
            LOG.warn("Committing " + Long.toHexString(zxid)
                    + " without seeing txn");
            return;
        }
        long firstElementZxid = pendingTxns.element().zxid;
        if (firstElementZxid != zxid) {
            LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
                    + " but next pending txn 0x"
                    + Long.toHexString(firstElementZxid));
            System.exit(12);
        }

//向Leader回复
        Request request = pendingTxns.remove();
        commitProcessor.commit(request);
    }

    synchronized public void sync(){
        if(pendingSyncs.size() ==0){
            LOG.warn("Not expecting a sync.");
            return;
        }

        Request r = pendingSyncs.remove();
		commitProcessor.commit(r);
    }

(5)发送同步数据
            
            if (needSnap) {
                boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                LearnerSnapshot snapshot = 
                        leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                try {
                    long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                    oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
                    bufferedOutput.flush();

二、总结

以上就是zookeeper Zab协议—Follower和Leader状态同步的两篇代码分析。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/345465.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号