概要:一、zk数据不一致场景数据不一致场景
1、zk过半成功,剩余未commit的节点
2、leader未发送proposal宕机
3、leader发送proposal成功,发送commit前宕机
1、zk过半成功,剩余未commit的节点
场景:比如5个节点,有三个返回写入成功,则如果有读请求到另两个节点,则会数据不一致。
解决方案:sync API
原理:sync是使client当前连接的Zookeeper服务器,和zk的Leader节点同步(sync)一下数据。
1)客户端掉用zk.sync()方法,生成一个ZooDefs.OpCode.sync类型的请求;
2)当follower收到sync请求时,将进行以下步骤:
添加至ConcurrentlinkedQueue 当follower与Leader之前的通信时按顺序(TCP保证)发送的,因此follower会同步所有Leader之前(Lewader接收到sync之前)commit的所有决议。 添加到pendingSyncs队列中的类:FollowerRequestProcessor.java 将request发送至Leader的类:Learner.java Leader commit后移除pendingSyncs的类:Learner.java的tryToCommit()方法中 2、leader未发送proposal宕机 这也就是数据同步说过的问题。 leader与follower之间同步数据的步骤如下: leader接收写请求,发起一个proposal提议,并生成一个全局性的唯一递增ID(zxid),并放入一个FIFO队列;follower收到proposal提议后,以事务日志的形式写入本地磁盘,并返回ACK给leader;leader收到过半follower的ACK之后,发起commit给follower提交proposal,当过半提交成功后leader就会commit。
leader刚生成一个proposal,还没有来得及发送出去,此时leader宕机,重新选举之后作为follower,但是新的leader没有这个proposal。 这种场景下的日志将会被丢弃,也就是该条消息丢失。 3、leader发送proposal成功,发送commit前宕机 如果发送proposal成功了,但是在将要发送commit命令前宕机了,如果重新进行选举,还是会选择zxid最大的节点作为leader,因此,这个日志并不会被丢弃,会在选举出leader之后重新同步到其他节点当中。 public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request);
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
... ...
void request(Request request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId);
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) {
request.request.rewind();
int len = request.request.remaining();
byte b[] = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
.toByteArray(), request.authInfo);
writePacket(qp, true);
}
synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
... ...
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
sendSync(r);
}
}
return true;
}



