SyncRequestProcessor
AckRequestProcessor
// 如果请求是LearnerSyncRequest实例对象
if (request instanceof LearnerSyncRequest)
{
// 主节点执行processSync
zks.getLeader().processSync((LearnerSyncRequest) request);
// 没有等待完成的提议
if (outstandingProposals.isEmpty())
{
sendSync(r);
QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
r.fh.queuePacket(qp);
}
else
{
// 尚有等待完成的提议
// 最后的提议,阻塞等待的同步请求集合
pendingSyncs.computeIfAbsent(lastProposed, k -> new ArrayList<>()).add(r);
}
}
else
{
if (shouldForwardTonextProcessor(request))
{
// 对于直接用户,直接交由下一阶段处理
nextProcessor.processRequest(request);
}
// 请求包含事务头--修改类请求
if (request.getHdr() != null)
{
try
{
// 通过主节点进行提议
zks.getLeader().propose(request);
}
catch (XidRolloverException e)
{
throw new RequestProcessorException(e.getMessage(), e);
}
// 提议后,进行请求同步
syncProcessor.processRequest(request);
}
}
提议
if (request.isThrottled())
{
LOG.error("Throttled request send as proposal: {}. Exiting.", request);
ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue());
}
if ((request.zxid & 0xffffffffL) == 0xffffffffL)
{
String msg =
"zxid lower 32 bits have rolled over, forcing re-election, and therefore new epoch start";
shutdown(msg);
throw new XidRolloverException(msg);
}
// 请求对象序列化
byte[] data = SerializeUtils.serializeRequest(request);
// 提议状态信息
proposalStats.setLastBufferSize(data.length);
// 数据包类别:Leader.PROPOSAL
// 数据包zxid:
// 数据包数据:
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, data, null);
// 提议对象
Proposal p = new Proposal();
p.packet = pp;
p.request = request;
synchronized (this)
{
// 提议对象还包含自身集群信息
p.addQuorumVerifier(self.getQuorumVerifier());
if (request.getHdr().getType() == OpCode.reconfig)
{
self.setLastSeenQuorumVerifier(request.qv, true);
}
if (self.getQuorumVerifier().getVersion() < self.getLastSeenQuorumVerifier().getVersion())
{
p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
LOG.debug("Proposing:: {}", request);
// 更新最后提议对象
lastProposed = p.packet.getZxid();
// 等待完成提议中新增lastProposed--提议对象
outstandingProposals.put(lastProposed, p);
// 向所有从节点/观察者发送提议
sendPacket(pp);
}
ServerMetrics.getMetrics().PROPOSAL_COUNT.add(1);
return p;
}
SyncRequestProcessor
resetSnapshotStats();
// 随机值
randRoll = ThreadLocalRandom.current().nextInt(snapCount / 2);
// 不超过snapSizeInBytes/2的随机值
randSize = Math.abs(ThreadLocalRandom.current().nextLong() % (snapSizeInBytes / 2));
// 记录时间
lastFlushTime = Time.currentElapsedTime();
while (true)
{
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
// 超时时间
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
// 取出请求
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
if (si == null)
{
flush();
si = queuedRequests.take();
}
if (si == REQUEST_OF_DEATH)
{
break;
}
// 当前时间
long startProcessTime = Time.currentElapsedTime();
ServerMetrics.
getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
// 数据实体执行append
if (!si.isThrottled() && zks.getZKDatabase().append(si))
{
// 如果事务日志累计到此刻,适合对数据实体执行快照
if (shouldSnapshot())
{
// 重置快照状态
resetSnapshotStats();
// 将当前事务日志文件进行缓冲刷新到磁盘操作
zks.getZKDatabase().rollLog();
if (!snapThreadMutex.tryAcquire())
{
LOG.warn("Too busy to snap, skipping");
}
else
{
new ZooKeeperThread("Snapshot Thread")
{
public void run()
{
try
{
// 执行快照生产,过程受到snapThreadMutex保护
zks.takeSnapshot();
}
catch (Exception e)
{
LOG.warn("Unexpected exception", e);
}
finally
{
snapThreadMutex.release();
}
}
}.start();
}
}
}
// 没有事务请求等待刷新到磁盘,且本请求遭遇过滤
else if (toFlush.isEmpty())
{
if (nextProcessor != null)
{
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable)
{
((Flushable) nextProcessor).flush();
}
}
continue;
}
// 这个请求属于尚未刷新到磁盘的事务请求
toFlush.add(si);
// 如果此时应该刷新,刷新事务日志文件缓冲到磁盘
if (shouldFlush())
{
flush();
}
ServerMetrics.getMetrics().
SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
append
txnCount.incrementAndGet();
return this.snapLog.append(si);
txnLog.append(si.getHdr(), si.getTxn(), si.getTxnDigest());
// 只有写请求有事务日志
if (hdr == null)
{
return false;
}
if (hdr.getZxid() <= lastZxidSeen)
{
LOG.warn(
"Current zxid {} is <= {} for {}", hdr.getZxid(), lastZxidSeen, Request.op2String(hdr.getType()));
}
else
{
lastZxidSeen = hdr.getZxid();
}
if (logStream == null)
{
LOG.info("Creating new log file: {}", Util.makeLogName(hdr.getZxid()));
// 新的事务日志文件
logFileWrite = new File(logDir, Util.makeLogName(hdr.getZxid()));
fos = new FileOutputStream(logFileWrite);
logStream = new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
// 新文件文件头
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC, VERSION, dbId);
fhdr.serialize(oa, "fileheader");
logStream.flush();
// 设置好文件大小
filePadding.setCurrentSize(fos.getChannel().position());
// 当前事务日志文件,写入内容不保证立即刷新到磁盘
streamsToFlush.add(fos);
}
filePadding.padFile(fos.getChannel());
// 序列化
byte[] buf = Util.marshallTxnEntry(hdr, txn, digest);
if (buf == null || buf.length == 0)
{
throw new IOException("Faulty serialization for header " + "and txn");
}
// 写入crc
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
// 写入事务本身
Util.writeTxnBytes(oa, buf);
return true;
commit
this.snapLog.commit();
txnLog.commit();
// 如果当前事务日志文件存在,刷新 文件缓冲到磁盘
if (logStream != null)
{
logStream.flush();
}
// 对需要刷新的每个对象
for (FileOutputStream log : streamsToFlush)
{
// 刷新缓冲到磁盘
log.flush();
// 取决于环境变量
if (forceSync)
{
// 当前时间
long startSyncNS = System.nanoTime();
FileChannel channel = log.getChannel();
// 强制刷新文件内容
channel.force(false);
syncElapsedMS
= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startSyncNS);
if (syncElapsedMS > fsyncWarningThresholdMS)
{
if (serverStats != null)
{
serverStats.incrementFsyncThresholdExceedCount();
}
LOG.warn("fsync-ing the write ahead log in {} took {}ms
which will adversely effect operation latency." + "File size is {} bytes.
See the ZooKeeper troubleshooting guide", Thread.currentThread().getName(),
syncElapsedMS, channel.size());
}
ServerMetrics.getMetrics().FSYNC_TIME.add(syncElapsedMS);
}
}
// 刷新后事务日志文件进行关闭
while (streamsToFlush.size() > 1)
{
streamsToFlush.poll().close();
}
// 当前事务日志文件缓冲刷新到磁盘
if (txnLogSizeLimit > 0)
{
long logSize = getCurrentLogSize();
if (logSize > txnLogSizeLimit)
{
LOG.debug("Log size limit reached: {}", logSize);
rollLog();
}
}
flush
// 没有等待要刷新到磁盘的事务请求--不处理
if (this.toFlush.isEmpty())
{
return;
}
ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
// 当前时间
long flushStartTime = Time.currentElapsedTime();
// 刷新缓冲到磁盘
zks.getZKDatabase().commit();
ServerMetrics.getMetrics().
SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
// 一般下一级为Ack
if (this.nextProcessor == null)
{
this.toFlush.clear();
}
else
{
// 由于积压的事务请求被顺序写入了磁盘
// 所以积压的事务请求现在可以顺序投递给下一级处理【持久性】
while (!this.toFlush.isEmpty())
{
final Request i = this.toFlush.remove();
long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
// 事务请求被写入磁盘后,才投递给下一级继续处理
this.nextProcessor.processRequest(i);
}
// 下一级支持flush的话,也flush
if (this.nextProcessor instanceof Flushable)
{
((Flushable) this.nextProcessor).flush();
}
}
lastFlushTime = Time.currentElapsedTime();
shouldFlush
// 取决于时间&积压待刷新事务请求数量
long flushDelay = zks.getFlushDelay();
long maxBatchSize = zks.getMaxBatchSize();
if ((flushDelay > 0) && (getRemainingDelay() == 0))
{
return true;
}
return (maxBatchSize > 0) && (toFlush.size() >= maxBatchSize);
AckRequestProcessor
QuorumPeer self = leader.self;
if (self != null)
{
request.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
// leader--受到来自主节点,关于zxid的ack
leader.processAck(self.getId(), request.zxid, null);
}
else
{
LOG.error("Null QuorumPeer");
}
processAck
if (!allowedToCommit)
{
return;
}
if (LOG.isTraceEnabled())
{
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values())
{
long packetZxid = p.packet.getZxid();
LOG.trace("outstanding proposal: 0x{}", Long.toHexString(packetZxid));
}
LOG.trace("outstanding proposals all");
}
// 主节点首个包
if ((zxid & 0xffffffffL) == 0)
{
return;
}
if (outstandingProposals.size() == 0)
{
LOG.debug("outstanding is 0");
return;
}
if (lastCommitted >= zxid)
{
LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",
Long.toHexString(lastCommitted), Long.toHexString(zxid));
return;
}
// 取得此zxid关联的待处理提议对象
Proposal p = outstandingProposals.get(zxid);
if (p == null)
{
LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",
Long.toHexString(zxid), followerAddr);
return;
}
if (ackLoggingFrequency > 0 && (zxid % ackLoggingFrequency == 0))
{
p.request.logLatency(ServerMetrics.getMetrics().ACK_LATENCY, Long.toString(sid));
}
// 提议对象中收集对此提议进行ack的成员
p.addAck(sid);
// 尝试提交
boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
// 重新配置暂不考虑
if (hasCommitted && p.request != null && p.request.getHdr().getType() == OpCode.reconfig)
{
long curZxid = zxid;
while (allowedToCommit && hasCommitted && p != null)
{
curZxid++;
p = outstandingProposals.get(curZxid);
if (p != null)
{
hasCommitted = tryToCommit(p, curZxid, null);
}
}
}
tryToCommit
// 上一zxid还未提交,处于顺序性要求本zxid也不可提交
if (outstandingProposals.containsKey(zxid - 1))
{
return false;
}
// 提议未收到过半投票成员ack时,不可提交
if (!p.hasAllQuorums())
{
return false;
}
if (zxid != lastCommitted + 1)
{
LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr);
LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
}
// 从待处理提议集合中移除zxid关联的
outstandingProposals.remove(zxid);
if (p.request != null)
{
// 将提议放到toBeApplied容器
toBeApplied.add(p);
}
if (p.request == null)
{
LOG.warn("Going to commit null: {}", p);
}
// 重新配置暂不考虑
else if (p.request.getHdr().getType() == OpCode.reconfig)
{
LOG.debug("Committing a reconfiguration! {}", outstandingProposals.size());
Long designatedLeader = getDesignatedLeader(p, zxid);
QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size() - 1).getQuorumVerifier();
self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
if (designatedLeader != self.getId())
{
LOG.info(String.format("Committing a reconfiguration (reconfigEnabled=%s);
this leader is not the designated " + "leader anymore, setting allowedToCommit=false",
self.isReconfigEnabled()));
allowedToCommit = false;
}
commitAndActivate(zxid, designatedLeader);
informAndActivate(p, designatedLeader);
}
else
{
p.request.logLatency(ServerMetrics.getMetrics().QUORUM_ACK_LATENCY);
commit(zxid);
synchronized (this)
{
lastCommitted = zxid;
}
QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
// 给每个从节点,观察者发送类型为Leader.COMMIT,zxid为zxid的集群包
sendPacket(qp);
ServerMetrics.getMetrics().COMMIT_COUNT.add(1);
inform(p);
// 用于给观察者发送提交通知
QuorumPacket qp =
new QuorumPacket(Leader.INFORM, proposal.request.zxid, proposal.packet.getData(), null);
sendObserverPacket(qp);
}
// 主节点对每个写入请求都需要向自己&各个参与者发送提议
// 收集过半自己&参与者的ack后,
// 自己&通知参与者进入请求提交
// 向commitProcessor加入请求
// commitProcessor负责对加入其中的请求逐个执行请求提交操作
zk.commitProcessor.commit(p.request);
// 如果有人之前发送了针对此zxid的同步请求
if (pendingSyncs.containsKey(zxid))
{
for (LearnerSyncRequest r : pendingSyncs.remove(zxid))
{
// 因为此zxid此刻进入提交处理了,所以向等待在此zxid的同步者发送SYNC
sendSync(r);
QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
r.fh.queuePacket(qp);
}
}
return true;



