目录
2021SC@SDUSC
Follower和Leader状态同步-代码细节分析
1.looking状态下选leader,产出leader和follower
2.follower调用follower.followLeader()和leader调用leader.lead()
3.leader.lead()对每一个节点创建一个LearnerCnxAcceptor()
4.等待follower注册
5.follower.followLeader()寻找、连接、注册leader
Learn.findLeader()
Learner.connectToLeader()
learn.registerWithLeader()
6.leader接收到follower注册信息,每个节点产生一个LearnerHandler()
接下篇
2021SC@SDUSC
Follower和Leader状态同步-代码细节分析
1.looking状态下选leader,产出leader和follower
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadonlyZooKeeperServer roZk =
new ReadonlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
2.follower调用follower.followLeader()和leader调用leader.lead()
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
3.leader.lead()对每一个节点创建一个LearnerCnxAcceptor()
void lead() throws IOException, InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimetaken = self.end_fle - self.start_fle;
self.setElectionTimetaken(electionTimetaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimetaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
4.等待follower注册
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
private volatile boolean stop = false;
public LearnerCnxAcceptor() {
super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
.getZooKeeperServerListener());
}
@Override
public void run() {
try {
while (!stop) {
Socket s = null;
boolean error = false;
try {
s = ss.accept();//等待follower注册
5.follower.followLeader()寻找、连接、注册leader
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimetaken = self.end_fle - self.start_fle;
self.setElectionTimetaken(electionTimetaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimetaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader(); //寻找leader
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);//连接leader
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);//向leader注册
Learn.findLeader()
protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
Vote current = self.getCurrentVote();
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
// Ensure we have the leader's correct IP address before
// attempting to connect.
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return leaderServer;
}
Learner.connectToLeader()
protected void connectToLeader(InetSocketAddress addr, String hostname)
throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();
int initLimitTime = self.tickTime * self.initLimit;
int remainingInitLimitTime = initLimitTime;
long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) {
try {
// recalculate the init limit time because retries sleep for 1000 milliseconds
remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
if (remainingInitLimitTime <= 0) {
LOG.error("initLimit exceeded on retries.");
throw new IOException("initLimit exceeded on retries.");
}
sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); //连接leader
learn.registerWithLeader()
protected long registerWithLeader(int pktType) throws IOException{
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
writePacket(qp, true);//向leader提交自己的信息
readPacket(qp); //接收leader返回的信息,包括zxid和newEpoch
6.leader接收到follower注册信息,每个节点产生一个LearnerHandler()
s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
接下篇
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadonlyZooKeeperServer roZk =
new ReadonlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader());
2.follower调用follower.followLeader()和leader调用leader.lead()
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
3.leader.lead()对每一个节点创建一个LearnerCnxAcceptor()
void lead() throws IOException, InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimetaken = self.end_fle - self.start_fle;
self.setElectionTimetaken(electionTimetaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimetaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
4.等待follower注册
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
private volatile boolean stop = false;
public LearnerCnxAcceptor() {
super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
.getZooKeeperServerListener());
}
@Override
public void run() {
try {
while (!stop) {
Socket s = null;
boolean error = false;
try {
s = ss.accept();//等待follower注册
5.follower.followLeader()寻找、连接、注册leader
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimetaken = self.end_fle - self.start_fle;
self.setElectionTimetaken(electionTimetaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimetaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader(); //寻找leader
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);//连接leader
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);//向leader注册
Learn.findLeader()
protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
Vote current = self.getCurrentVote();
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
// Ensure we have the leader's correct IP address before
// attempting to connect.
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return leaderServer;
}
Learner.connectToLeader()
protected void connectToLeader(InetSocketAddress addr, String hostname)
throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();
int initLimitTime = self.tickTime * self.initLimit;
int remainingInitLimitTime = initLimitTime;
long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) {
try {
// recalculate the init limit time because retries sleep for 1000 milliseconds
remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
if (remainingInitLimitTime <= 0) {
LOG.error("initLimit exceeded on retries.");
throw new IOException("initLimit exceeded on retries.");
}
sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); //连接leader
learn.registerWithLeader()
protected long registerWithLeader(int pktType) throws IOException{
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
writePacket(qp, true);//向leader提交自己的信息
readPacket(qp); //接收leader返回的信息,包括zxid和newEpoch
6.leader接收到follower注册信息,每个节点产生一个LearnerHandler()
s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
接下篇
void lead() throws IOException, InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimetaken = self.end_fle - self.start_fle;
self.setElectionTimetaken(electionTimetaken);
LOG.info("LEADING - LEADER ELECTION TOOK - {} {}", electionTimetaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);
try {
self.tick.set(0);
zk.loadData();
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
// Start thread that waits for connection requests from
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
4.等待follower注册
class LearnerCnxAcceptor extends ZooKeeperCriticalThread {
private volatile boolean stop = false;
public LearnerCnxAcceptor() {
super("LearnerCnxAcceptor-" + ss.getLocalSocketAddress(), zk
.getZooKeeperServerListener());
}
@Override
public void run() {
try {
while (!stop) {
Socket s = null;
boolean error = false;
try {
s = ss.accept();//等待follower注册
5.follower.followLeader()寻找、连接、注册leader
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimetaken = self.end_fle - self.start_fle;
self.setElectionTimetaken(electionTimetaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimetaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader(); //寻找leader
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);//连接leader
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);//向leader注册
Learn.findLeader()
protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
Vote current = self.getCurrentVote();
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
// Ensure we have the leader's correct IP address before
// attempting to connect.
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return leaderServer;
}
Learner.connectToLeader()
protected void connectToLeader(InetSocketAddress addr, String hostname)
throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();
int initLimitTime = self.tickTime * self.initLimit;
int remainingInitLimitTime = initLimitTime;
long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) {
try {
// recalculate the init limit time because retries sleep for 1000 milliseconds
remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
if (remainingInitLimitTime <= 0) {
LOG.error("initLimit exceeded on retries.");
throw new IOException("initLimit exceeded on retries.");
}
sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); //连接leader
learn.registerWithLeader()
protected long registerWithLeader(int pktType) throws IOException{
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
writePacket(qp, true);//向leader提交自己的信息
readPacket(qp); //接收leader返回的信息,包括zxid和newEpoch
6.leader接收到follower注册信息,每个节点产生一个LearnerHandler()
s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
接下篇
void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimetaken = self.end_fle - self.start_fle;
self.setElectionTimetaken(electionTimetaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {} {}", electionTimetaken,
QuorumPeer.FLE_TIME_UNIT);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader(); //寻找leader
try {
connectToLeader(leaderServer.addr, leaderServer.hostname);//连接leader
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);//向leader注册
Learn.findLeader()
protected QuorumServer findLeader() {
QuorumServer leaderServer = null;
// Find the leader by id
Vote current = self.getCurrentVote();
for (QuorumServer s : self.getView().values()) {
if (s.id == current.getId()) {
// Ensure we have the leader's correct IP address before
// attempting to connect.
s.recreateSocketAddresses();
leaderServer = s;
break;
}
}
if (leaderServer == null) {
LOG.warn("Couldn't find the leader with id = "
+ current.getId());
}
return leaderServer;
}
Learner.connectToLeader()
protected void connectToLeader(InetSocketAddress addr, String hostname)
throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();
int initLimitTime = self.tickTime * self.initLimit;
int remainingInitLimitTime = initLimitTime;
long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) {
try {
// recalculate the init limit time because retries sleep for 1000 milliseconds
remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
if (remainingInitLimitTime <= 0) {
LOG.error("initLimit exceeded on retries.");
throw new IOException("initLimit exceeded on retries.");
}
sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); //连接leader
learn.registerWithLeader()
protected long registerWithLeader(int pktType) throws IOException{
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
writePacket(qp, true);//向leader提交自己的信息
readPacket(qp); //接收leader返回的信息,包括zxid和newEpoch
6.leader接收到follower注册信息,每个节点产生一个LearnerHandler()
s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
接下篇
protected void connectToLeader(InetSocketAddress addr, String hostname)
throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();
int initLimitTime = self.tickTime * self.initLimit;
int remainingInitLimitTime = initLimitTime;
long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) {
try {
// recalculate the init limit time because retries sleep for 1000 milliseconds
remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
if (remainingInitLimitTime <= 0) {
LOG.error("initLimit exceeded on retries.");
throw new IOException("initLimit exceeded on retries.");
}
sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime)); //连接leader
learn.registerWithLeader()
protected long registerWithLeader(int pktType) throws IOException{
long lastLoggedZxid = self.getLastLoggedZxid();
QuorumPacket qp = new QuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());
writePacket(qp, true);//向leader提交自己的信息
readPacket(qp); //接收leader返回的信息,包括zxid和newEpoch
6.leader接收到follower注册信息,每个节点产生一个LearnerHandler()
s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();
接下篇
s = ss.accept();
// start with the initLimit, once the ack is processed
// in LearnerHandler switch to the syncLimit
s.setSoTimeout(self.tickTime * self.initLimit);
s.setTcpNoDelay(nodelay);
BufferedInputStream is = new BufferedInputStream(
s.getInputStream());
LearnerHandler fh = new LearnerHandler(s, is, Leader.this);
fh.start();



