- 2021SC@SDUSC
- 前言
- SelectorThread
- 总结
本章进入源码世界来一步一步分析客户端与服务端之间是如何通过ClientCnxn/ServerCnxn来建立起网络通信的。在上一章中已经介绍客户端是如何将请求发送到服务端的,本章将介绍服务端是如何响应客户端请求的,下一章将介绍客户端收到服务端的响应之后是如何操作的。
SelectorThread在上一章讲述到客户端将请求发送到服务端时,会将传输的请求包写入缓冲区中。此时服务端将通过SelectorThread去读缓冲区的内容,进而执行后续操作。SelectorThread默认为NIO实现的SelectorThread,显然这是受服务连接工厂管理的,即在NIOServerCnxnFactory中实现。 SelectorThread具体实现细节在之前分析NIOServerCnxnFactory源码时讲述过,这里不再进行说明,只对涉及到本章的内容进行讲解。
①在NIOServerCnxnFactory源码分析中介绍到SelectorThread的run()方法中,首先会调用select()方法去监控连接完成的socket是否有读或写事件,在这情景下,显然select()方法检测到有读事件。即满足以下条件,会调用handleIO()方法。
if (key.isReadable() || key.isWritable()) {
handleIO(key);
}
②在handleIO()中,会启动woker线程池中的一个worker来处理这个事件,处理事件的主类是ScheduledWorkRequest,最终会调用run函数中的workRequest.doWork();来处理请求。
第一步:调用workerPool.schedule(workRequest);
private void handleIO(SelectionKey key) {
IOWorkRequest workRequest = new IOWorkRequest(this, key);
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
//在处理其连接时停止选择此键
cnxn.disableSelectable();
key.interestOps(0);
touchCnxn(cnxn);
//启动worker线程池中的一个worker
workerPool.schedule(workRequest);
}
第二步:调用schedule(workRequest, 0);
public void schedule(WorkRequest workRequest) {
schedule(workRequest, 0);
}
第三步:如果有工作池则worker.execute(scheduledWorkRequest); 如果工作池里没有worker则scheduledWorkRequest.run();
public void schedule(WorkRequest workRequest, long id) {
if (stopped) {
workRequest.cleanup();
return;
}
// 调用工作请求
ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
//如果工作池里有worker,那就使用工作池,如果没有,那就直接工作
int size = workers.size();
if (size > 0) {
try {
// make sure to map negative ids as well to [0, size-1]
int workerNum = ((int) (id % size) + size) % size;
ExecutorService worker = workers.get(workerNum);
worker.execute(scheduledWorkRequest);
} catch (RejectedExecutionException e) {
LOG.warn("ExecutorService rejected execution", e);
workRequest.cleanup();
}
} else {
// When there is no worker thread pool, do the work directly
// and wait for its completion
scheduledWorkRequest.run();
}
}
第四步:调用workRequest.doWork();
public void run() {
try {
// Check if stopped while request was on queue
if (stopped) {
workRequest.cleanup();
return;
}
//处理请求
workRequest.doWork();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
workRequest.cleanup();
}
}
}
第五步:而在基于NIO连接实现的WorkRequest,在NIOServerCnxnFactory中,即IOWorkRequest类。 其详细功能在NIOServerCnxnFactory中有介绍,这里不再阐述。主要就是为了让worker开始工作。而工作的核心操作就是doIO()方法,调用cnxn.doIO(key);
public void doWork() throws InterruptedException {
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
if (key.isReadable() || key.isWritable()) {
//进行IO操作,核心操作
cnxn.doIO(key);
// 检查是否关闭或doIO()是否关闭了此连接
if (stopped) {
cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
return;
}
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
touchCnxn(cnxn);
}
//将此连接再次标记为可供选择
cnxn.enableSelectable();
// 在队列上推送更新请求以继续选择
// 在当前感兴趣的操作集上,可能已更改
// 作为我们刚才执行的I/O操作的结果。
if (!selectorThread.addInterestOpsUpdateRequest(key)) {
cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
}
}
第六步:基于NIO连接的doIO()方法在NIOServerCnxn源码分析中进行详细解读过。 主要就是对缓冲区中的内容进行读或写操作。在这里给定的情形下,显然是可读操作,其中读取数据内容(有效载荷)是readPayLoad()方法。
void doIO(SelectionKey k) throws InterruptedException {
……
……
if (k.isReadable()) {//key可读
//将内容从socket写入incoming缓冲
int rc = sock.read(incomingBuffer);
if (rc < 0) {//流结束异常,无法从客户端读取数据
handleFailedRead();
}
//缓冲区已经写满
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
//读取下个请求
if (incomingBuffer == lenBuffer) {
//翻转缓冲区,可读
incomingBuffer.flip();
//读取lenBuffer的前四个字节,当读取的是内容长度时则为true,否则为false
isPayload = readLength(k);
//清除缓冲
incomingBuffer.clear();
} else {
//因为在readLength中根据Len已经重新分配了incomingBuffer
isPayload = true;
}
if (isPayload) { //不为四个字母,为实际内容
//读取数据内容
readPayload();
} else {
//四个字母,为四字母的命令
return;
}
}
}
……
……
第七步:readPayload()在NIOServerCnxn源码分析中也有写到,具体内容如下,主要功能就是读取请求包中携带的有效载荷。到这里服务端就已经完整接收到了客户端发送过来的 连接请求包(请求包),然后服务端会调用readConnectRequest()方法 (readRequest()方法)读取这个连接请求包(请求包)。 根据给定的场景,显然这里是调用readConnectRequest()方法。
private void readPayload() throws IOException, InterruptedException, ClientCnxnLimitException {
// 表示还未读取完socket中内容
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
// 将socket的内容读入缓冲
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
// 流结束异常,无法从客户端读取数据
if (rc < 0) {
handleFailedRead();
}
}
// 表示已经读取完了Socket中内容
if (incomingBuffer.remaining() == 0) { // have we read length bytes?
incomingBuffer.flip();
// 接收到packet
packetReceived(4 + incomingBuffer.remaining());
// 未初始化
if (!initialized) {
// 读取连接请求
readConnectRequest();
} else {
// 读取请求
readRequest();
}
// 清除缓冲
lenBuffer.clear();
// 赋值incomingBuffer,即清除incoming缓冲
incomingBuffer = lenBuffer;
}
}
第八步:调用processConnectRequest()方法,该方法在ZooKeeperServer中,这是很显然的,每一个ZooKeeperServer必然都能够处理客户端发送的连接请求或者是其他请求
private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
// 处理连接请求
zkServer.processConnectRequest(this, incomingBuffer);
initialized = true;
}
第九步:处理连接请求,调用createSession()方法创建会话
public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
throws IOException, ClientCnxnLimitException {
// ConnectReq的packet是没有header的,所以直接读内容,反序列化
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
LOG.debug(
"Session establishment request from client {} client's lastZxid is 0x{}",
cnxn.getRemoteSocketAddress(),
Long.toHexString(connReq.getLastZxidSeen()));
// 生成会话ID
long sessionId = connReq.getSessionId();
int tokensNeeded = 1;
if (connThrottle.isConnectionWeightEnabled()) {
if (sessionId == 0) {
if (localSessionEnabled) {
tokensNeeded = connThrottle.getRequiredTokensForLocal();
} else {
tokensNeeded = connThrottle.getRequiredTokensForGlobal();
}
} else {
tokensNeeded = connThrottle.getRequiredTokensForRenew();
}
}
if (!connThrottle.checkLimit(tokensNeeded)) {
throw new ClientCnxnLimitException();
}
ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
boolean readonly = false;
try {
readonly = bia.readBool("readOnly");
cnxn.isOldClient = false;
} catch (IOException e) {
// this is ok -- just a packet from an old client which
// doesn't contain readonly field
LOG.warn(
"Connection request from old client {}; will be dropped if server is in r-o mode",
cnxn.getRemoteSocketAddress());
}
if (!readonly && this instanceof ReadOnlyZooKeeperServer) {
String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
}
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
+ " as it has seen zxid 0x"
+ Long.toHexString(connReq.getLastZxidSeen())
+ " our last zxid is 0x"
+ Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ " client must try another server";
LOG.info(msg);
throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
}
// 设置客户端请求的session相关参数
int sessionTimeout = connReq.getTimeOut();
byte[] passwd = connReq.getPasswd();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
sessionTimeout = maxSessionTimeout;
}
cnxn.setSessionTimeout(sessionTimeout);
//暂时先不读后续请求,直到session建立,类似于操作系统的关中断
cnxn.disableRecv();
if (sessionId == 0) {
// 创建会话
long id = createSession(cnxn, passwd, sessionTimeout);
LOG.debug(
"Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(id),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
} else {
long clientSessionId = connReq.getSessionId();
LOG.debug(
"Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
Long.toHexString(clientSessionId),
Long.toHexString(connReq.getLastZxidSeen()),
connReq.getTimeOut(),
cnxn.getRemoteSocketAddress());
if (serverCnxnFactory != null) {
serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
if (secureServerCnxnFactory != null) {
secureServerCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
}
//可以读取后续请求,类似于操作系统的开中断
cnxn.setSessionId(sessionId);
//重新打开会话
reopenSession(cnxn, sessionId, passwd, sessionTimeout);
ServerMetrics.getMetrics().CONNECTION_RevalIDATE_COUNT.add(1);
}
}
第十步:创建完会话后,会调用 submitRequest()方法,将请求提交给服务端执行链,执行链会开始执行请求
long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
if (passwd == null) {
// Possible since it's just deserialized from a packet on the wire.
passwd = new byte[0];
}
// 创建会话,会话ID自增
long sessionId = sessionTracker.createSession(timeout);
//随机密码
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
//每个服务连接都只有一个会话ID
cnxn.setSessionId(sessionId);
//提交请求给后面的执行链
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
submitRequest(si);
return sessionId;
}
第十一步:请求链,请求处理器会调用requestThrottler.submitRequest(si);
public void submitRequest(Request si) {
enqueueRequest(si);
}
public void enqueueRequest(Request si) {
if (requestThrottler == null) {
synchronized (this) {
try {
//由于所有请求都传递给请求处理器,因此它应该等待设置请求处理器链。安装后,状态将更新为正在运行。
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (requestThrottler == null) {
throw new RuntimeException("Not started");
}
}
}
requestThrottler.submitRequest(si);
}
第十二步:当服务端开始提交请求,会调用submitRequestNow()方法,里面执行了firstProcessor.processRequest()操作,第一个Processor开始对请求进行预处理。
public void submitRequestNow(Request si) {
……
……
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
setLocalSessionFlag(si);
//提交给后续的processor执行,一般用异步以提升性能
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type {}", si.type);
// Update request accounting/throttling limits
requestFinished(si);
new UnimplementedRequestProcessor().processRequest(si);
}
……
……
}
第十三步:PrepRequestProcessor继承了ZooKeeperCriticalThread,即PrepRequestProcessor也是一个线程,在十二步中调用了该线程处理请求操作,当该线程运行时即调用了run()方法,会从提交上来的请求链中获取请求,然后调用pRequest()方法对请求进行加工预处理。
public void run() {
……
while (true) {
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
// 处理提交上来的请求
Request request = submittedRequests.take();
ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME
.add(Time.currentElapsedTime() - request.prepQueueStartTime);
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
//预处理开始时间
request.prepStartTime = Time.currentElapsedTime();
//预处理请求
pRequest(request);
}
……
}
第十四步:pRequest()预处理方法中,有很多操作码。这里给定的场景的创建会话。所以分析预处理创建会话的操作码。 在ZooKeeper中创建或者删除会话时,不能申请请求操作码。核心是将需要预处理的请求进行重新组装,方便后续Processor进行处理,调用pRequest2Txn()方法。
protected void pRequest(Request request) throws RequestProcessorException {
……
case OpCode.createSession:
case OpCode.closeSession:
if (!request.isLocalSession()) {
//在这里,组装了Request的header和txh实现,方便后续processor处理
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
}
break;
……
//请求的zxid
request.zxid = zks.getZxid();
ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - request.prepStartTime);
//让后续processor处理,这里一般是异步以提高性能
nextProcessor.processRequest(request);
第十五步:由于所给定的场景是创建会话,所以这里分析创建会话操作码的具体操作。
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
……
case OpCode.createSession:
//设置会话超时时间
request.request.rewind();
int to = request.request.getInt();
//组装具体的Record实现,这里是CreateSessionTxn,方便后续processor处理
request.setTxn(new CreateSessionTxn(to));
request.request.rewind();
// only add the global session tracker but not to ZKDb
zks.sessionTracker.trackSession(request.sessionId, to);
zks.setOwner(request.sessionId, request.getOwner());
break;
……
}
第十六步:由十四步中调用的nextProcessor.processRequest(request),即下一个SyncRequestProcessor对重装的请求进行处理。与预处理类似,当SyncRequestProcessor运行时,即执行run()方法,核心功能是执行请求返回数据。
public void run() {
try {
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
resetSnapshotStats();
lastFlushTime = Time.currentElapsedTime();
while (true) {
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
//获取预处理过的请求
long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
//如果没有请求,则flush,阻塞等待,代表之前的请求都被处理了
if (si == null) {
flush();
si = queuedRequests.take();
}
//如果是消亡请求,则直接退出
if (si == REQUEST_OF_DEATH) {
break;
}
//开始处理时的时间
long startProcessTime = Time.currentElapsedTime();
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime);
//将Request append到log输出流,先序列化再append,此时request还没flush到磁盘,还在内存
if (zks.getZKDatabase().append(si)) {
if (shouldSnapshot()) {
resetSnapshotStats();
//将内存中的log flush到磁盘
zks.getZKDatabase().rollLog();
// take a snapshot
if (!snapThreadMutex.tryAcquire()) {
LOG.warn("Too busy to snap, skipping");
} else {
new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
} finally {
snapThreadMutex.release();
}
}
}.start();
}
}
} else if (toFlush.isEmpty()) {//如果是写请求,而且flush队列为空,执行往下执行
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
//写请求前面append到log输出流后,在这里加入到flush队列,后续批量处理
toFlush.add(si);
//如果系统压力大,可能需要多个request才会flush,flush之后可以被后续processor处理
if (shouldFlush()) {
flush();
}
ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
}
} catch (Throwable t) {
handleException(this.getName(), t);
}
LOG.info("SyncRequestProcessor exited!");
}
第十七步:每当请求flush到磁盘后,会调用下一个Processor,即FinalRequestProcessor。
private void flush() throws IOException, RequestProcessorException {
if (this.toFlush.isEmpty()) {
return;
}
ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
//flush开始时间
long flushStartTime = Time.currentElapsedTime();
//将之前的append log flush到磁盘,并顺便关闭旧的log文件句柄
zks.getZKDatabase().commit();
ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
//如果没有下一个处理线程,则清理队列里的请求
if (this.nextProcessor == null) {
this.toFlush.clear();
} else {
//如果队列里有请求
while (!this.toFlush.isEmpty()) {
final Request i = this.toFlush.remove();
long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
//执行后面的processor
this.nextProcessor.processRequest(i);
}
if (this.nextProcessor instanceof Flushable) {
((Flushable) this.nextProcessor).flush();
}
//flush结束的时间
lastFlushTime = Time.currentElapsedTime();
}
}
第十八步:在FinalRequestProcessor拿到database的处理结果后,调用zks.finishSessionInit(request.cnxn, true)将响应通过NIOServerCnxn写回给客户端。
case OpCode.createSession: {
lastOp = "SESS";
updateStats(request, lastOp, lastZxid);
zks.finishSessionInit(request.cnxn, true);
return;
}
第十九步:向客户端的缓冲区中写入内容
public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
……
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
//通过channel写回
cnxn.sendBuffer(bb);
if (valid) {
LOG.debug(
"Established session 0x{} with negotiated timeout {} for client {}",
Long.toHexString(cnxn.getSessionId()),
cnxn.getSessionTimeout(),
cnxn.getRemoteSocketAddress());
//打开selector的读事件
cnxn.enableRecv();
}
……
}
第二十步:通过NIOServerCnxn的sendBuffer()方法写入客户端的缓冲区中。
public void sendBuffer(ByteBuffer... buffers) {
if (LOG.isTraceEnabled()) {
LOG.trace("Add a buffer to outgoingBuffers, sk {} is valid: {}", sk, sk.isValid());
}
synchronized (outgoingBuffers) {
for (ByteBuffer buffer : buffers) {
outgoingBuffers.add(buffer);
}
outgoingBuffers.add(packetSentinel);
}
requestInterestOpsUpdate();
}
总结
以上就服务端响应客户端连接请求的所有过程,服务端响应客户端其他请求也是如此,只是其中的操作码改变以及一些细微的方法调用差异。下一章将介绍客户端收到服务端的响应后的执行流程。



