目录
2021SC@SDUSC
一、消息广播是什么?
二、消息广播过程
三、源码解析
1.Leader.propose()
[1]、生成相应的Proposal,发送给Follower
[2]、接收Follower的ack
2.Follower.followLeader()
[1]、读取leader返回的结果:leaderinfo
[2]、发送ack给leader
3、LearnerHander.syncFollower()
4.LearnerHandleader.queueCommittedProposals()
5.Follower.processPacket()
6.FollowerZooKeeperServer.commit()
2021SC@SDUSC
一、消息广播是什么?
消息广播是Zab协议的一种基本模式,当集群中已经有过半的follower与leader服务器完成了状态同步,那么整个zk集群就可以进入消息广播模式了。如果集群中的其他节点收到客户端地事务请求,那么这些非leader服务器会首先将这个事务请求转发给leader服务器。
ZAB协议的消息广播过程使用的是一个原子广播协议,针对每个客户端的事务请求,leader服务器会为其生成对应的事务Proposal,并将其发送给集群中其余所有的机器,然后再分别收集各自的选票,最后进行事务提交。
二、消息广播过程
1.Leader 接收到客户端新的事务请求之后,会生成对应的事务Proposal,并根据ZXID的顺序向所有Follower发送提案。
2.Follower根据消息接收的先后次序来处理这些来自Leader的事务Proposal,并将他们追加到hf中,之后再反馈给Leader。
3.当Leader接收到来自过半Follower针对事务Proposal的Ack消息后,就会发送Commit消息给所有的Follower,要求他们进行事务的提交。
4.当Follower接收到来自Leader的Commit消息后,就会开始提交事务Proposal。
三、源码解析
1.Leader.propose()
[1]、生成相应的Proposal,发送给Follower
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
[2]、接收Follower的ack
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
2.Follower.followLeader()
[1]、读取leader返回的结果:leaderinfo
readPacket(qp);
[2]、发送ack给leader
[1]、生成相应的Proposal,发送给Follower
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
lastProposed = p.packet.getZxid();
outstandingProposals.put(lastProposed, p);
sendPacket(pp);
[2]、接收Follower的ack
qp = new QuorumPacket();
ia.readRecord(qp, "packet");
2.Follower.followLeader()
[1]、读取leader返回的结果:leaderinfo
readPacket(qp);
[2]、发送ack给leader
qp = new QuorumPacket(); ia.readRecord(qp, "packet");
2.Follower.followLeader()
[1]、读取leader返回的结果:leaderinfo
readPacket(qp);
[2]、发送ack给leader
readPacket(qp);
[2]、发送ack给leader
case Leader.COMMIT:
fzk.commit(qp.getZxid());
Request request = fzk.pendingTxns.element();
commitProcessor.commit(request)
3、LearnerHander.syncFollower()
提交提案
currentZxid = queueCommittedProposals(itr, peerLastZxid,null, maxCommittedLog)
4.LearnerHandleader.queueCommittedProposals()
将Leader.COMMIT发给follower
queuePacket(propose.packet);
queueOpPacket(Leader.COMMIT, packetZxid);
queuedZxid = packetZxid;
5.Follower.processPacket()
接收Leader.COMMIT
case Leader.COMMIT:
fzk.commit(qp.getZxid());
6.FollowerZooKeeperServer.commit()
向Leader回复
Request request = pendingTxns.remove();
commitProcessor.commit(request);



