pollSelectionKeys.attemptRead中channel.read方法,receive.complete()是对接受的消息进行判断是否完整
public NetworkReceive read() throws IOException {
NetworkReceive result = null;
//初始化对象NetworkReceive
if (receive == null) {
receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
}
//接收数据
receive(receive);
if (receive.complete()) {
receive.payload().rewind();
result = receive;
receive = null;
} else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
//pool must be out of memory, mute ourselves.
mute();
}
return result;
}
receive方法解决了粘包拆包问题,size是一个固定大小的bytebuffer用来装receive消息体的大小,当size还有剩余代表没读完,这个针对的是size含有的信息被分开导致的。当size的position=limit时候代表可以了,rewind后开始读取完整size。分配size大小的bytebuffer后读数据,去channel读数据进buffer。
public long readFrom(ScatteringByteChannel channel) throws IOException {
int read = 0;
if (size.hasRemaining()) {
int bytesRead = channel.read(size);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
if (!size.hasRemaining()) {
size.rewind();
int receiveSize = size.getInt();
if (receiveSize < 0)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
if (maxSize != UNLIMITED && receiveSize > maxSize)
throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
if (receiveSize == 0) {
buffer = EMPTY_BUFFER;
}
}
}
if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
buffer = memoryPool.tryAllocate(requestedBufferSize);
if (buffer == null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
}
if (buffer != null) {
int bytesRead = channel.read(buffer);
if (bytesRead < 0)
throw new EOFException();
read += bytesRead;
}
return read;
}
receive.complete判断完整性,用size和buffer的position来判断
public boolean complete() {
return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
}
处理暂存状态的响应
client发送数据中的pollSelectionKeys方法有一步attemptRead(key, channel),进而用addToStagedReceives把返回的receive放进stagedReceives中,一个channel对应一个队列存放这些返回的receive(已接收完成,但还没有暴露给用户的响应)
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<>());
Deque deque = stagedReceives.get(channel);
deque.add(receive);
}
addToCompletedReceives方法对 stagedReceives进行处理
stagedReceives不空的情况下,取出channel对应的存放NetworkReceive的dq存入
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {
Iterator>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!explicitlyMutedChannels.contains(channel)) {
Deque deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
}
}
}
至此存好了响应
networkClient中的handleCompletedSends(responses, updatedNow) 对发送出去的send 不需要响应的做了处理。
networkClient中的handleCompletedReceives(responses, updatedNow)对pollSelectionKeys中获取的响应进行处理
InFlightRequest req = inFlightRequests.completeNext(source)这步把inFlightRequests里面受到相应的inFlightRequest干掉了
private void handleCompletedReceives(List回调函数responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); if (log.isTraceEnabled()) { log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination, req.header.apiKey(), req.header.correlationId(), responseStruct); } // If the received response includes a throttle delay, throttle the connection. AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct); maybeThrottle(body, req.header.apiVersion(), req.destination, now); //返回的是元数据请求,去更新元数据 if (req.isInternalRequest && body instanceof metadataResponse) metadataUpdater.handleCompletedmetadataResponse(req.header, now, (metadataResponse) body); //版本协调信息 else if (req.isInternalRequest && body instanceof ApiVersionsResponse) handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); else //正常返回消息 responses.add(req.completed(body, now)); } }
回调函数是在kafka producer 的run方法传入,一路传入到recordAccumulator,存入thunk,在networkClient的completeResponse阶段触发
completeResponses(responses)
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
再来看handleProduceResponse,有响应用completeBatch 方法partResp处理
private void handleProduceResponse(ClientResponse response, Mapbatches, long now) { RequestHeader requestHeader = response.requestHeader(); long receivedTimeMs = response.receivedTimeMs(); int correlationId = requestHeader.correlationId(); if (response.wasDisconnected()) { //省略 } else { log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId); // if we have a response, parse it //有响应 if (response.hasResponse()) { ProduceResponse produceResponse = (ProduceResponse) response.responseBody(); for (Map.Entry entry : produceResponse.responses().entrySet()) { TopicPartition tp = entry.getKey(); ProduceResponse.PartitionResponse partResp = entry.getValue(); ProducerBatch batch = batches.get(tp); completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs()); } this.sensors.recordLatency(response.destination(), response.requestLatencyMs()); } else { //不需要响应completeBatch // this is the acks = 0 case, just complete all requests for (ProducerBatch batch : batches.values()) { completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L); } } } }
completeBatch方法最重要的部分在此
if (batch.done(response.baseOffset, response.logAppendTime, null)) {
maybeRemoveFromInflightBatches(batch);
this.accumulator.deallocate(batch);
}
done方法中的completeFutureAndFireCallbacks
completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
callback.onCompletion回调函数安排上了!
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
produceFuture.set(baseOffset, logAppendTime, exception);
// 一条消息就代表一个Thunk,循 环执行每个消息的 Callback 回调
for (Thunk thunk : thunks) {
try {
if (exception == null) {
Recordmetadata metadata = thunk.future.value();
if (thunk.callback != null)
thunk.callback.onCompletion(metadata, null);
} else {
if (thunk.callback != null)
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
}
}
produceFuture.done();
}



