栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 数据挖掘与分析

kafka源码解析(7)生产者处理响应数据(粘包拆包/处理暂存状态的响应/回调函数)

kafka源码解析(7)生产者处理响应数据(粘包拆包/处理暂存状态的响应/回调函数)

tcp粘包拆包问题解决

pollSelectionKeys.attemptRead中channel.read方法,receive.complete()是对接受的消息进行判断是否完整

public NetworkReceive read() throws IOException {
        NetworkReceive result = null;
		
		//初始化对象NetworkReceive
        if (receive == null) {
            receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
        }
		//接收数据
        receive(receive);
        if (receive.complete()) {
            receive.payload().rewind();
            result = receive;
            receive = null;
        } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
            //pool must be out of memory, mute ourselves.
            mute();
        }
        return result;
    }

receive方法解决了粘包拆包问题,size是一个固定大小的bytebuffer用来装receive消息体的大小,当size还有剩余代表没读完,这个针对的是size含有的信息被分开导致的。当size的position=limit时候代表可以了,rewind后开始读取完整size。分配size大小的bytebuffer后读数据,去channel读数据进buffer。

public long readFrom(ScatteringByteChannel channel) throws IOException {
        int read = 0;
        if (size.hasRemaining()) {
            int bytesRead = channel.read(size);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
            if (!size.hasRemaining()) {
                size.rewind();
                int receiveSize = size.getInt();
                if (receiveSize < 0)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                if (maxSize != UNLIMITED && receiveSize > maxSize)
                    throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
                requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
                if (receiveSize == 0) {
                    buffer = EMPTY_BUFFER;
                }
            }
        }
        if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
            buffer = memoryPool.tryAllocate(requestedBufferSize);
            if (buffer == null)
                log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
        }
        if (buffer != null) {
            int bytesRead = channel.read(buffer);
            if (bytesRead < 0)
                throw new EOFException();
            read += bytesRead;
        }

        return read;
    }

receive.complete判断完整性,用size和buffer的position来判断

public boolean complete() {
        return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
    }
处理暂存状态的响应

client发送数据中的pollSelectionKeys方法有一步attemptRead(key, channel),进而用addToStagedReceives把返回的receive放进stagedReceives中,一个channel对应一个队列存放这些返回的receive(已接收完成,但还没有暴露给用户的响应)

private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
        if (!stagedReceives.containsKey(channel))
            stagedReceives.put(channel, new ArrayDeque<>());

        Deque deque = stagedReceives.get(channel);
        deque.add(receive);
    }

addToCompletedReceives方法对 stagedReceives进行处理

stagedReceives不空的情况下,取出channel对应的存放NetworkReceive的dq存入

private void addToCompletedReceives() {
        if (!this.stagedReceives.isEmpty()) {
            Iterator>> iter = this.stagedReceives.entrySet().iterator();
            while (iter.hasNext()) {
                Map.Entry> entry = iter.next();
                KafkaChannel channel = entry.getKey();
                if (!explicitlyMutedChannels.contains(channel)) {
                    Deque deque = entry.getValue();
                    addToCompletedReceives(channel, deque);
                    if (deque.isEmpty())
                        iter.remove();
                }
            }
        }
    }

至此存好了响应

networkClient中的handleCompletedSends(responses, updatedNow) 对发送出去的send 不需要响应的做了处理。

networkClient中的handleCompletedReceives(responses, updatedNow)对pollSelectionKeys中获取的响应进行处理

InFlightRequest req = inFlightRequests.completeNext(source)这步把inFlightRequests里面受到相应的inFlightRequest干掉了

private void handleCompletedReceives(List responses, long now) {
        for (NetworkReceive receive : this.selector.completedReceives()) {
            String source = receive.source();
            InFlightRequest req = inFlightRequests.completeNext(source);
            Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
                throttleTimeSensor, now);
            if (log.isTraceEnabled()) {
                log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                    req.header.apiKey(), req.header.correlationId(), responseStruct);
            }
            // If the received response includes a throttle delay, throttle the connection.
            AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
            maybeThrottle(body, req.header.apiVersion(), req.destination, now);
			//返回的是元数据请求,去更新元数据
            if (req.isInternalRequest && body instanceof metadataResponse)
                metadataUpdater.handleCompletedmetadataResponse(req.header, now, (metadataResponse) body);
            //版本协调信息
            else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
                handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
            else
            	//正常返回消息
                responses.add(req.completed(body, now));
        }
    }
回调函数

回调函数是在kafka producer 的run方法传入,一路传入到recordAccumulator,存入thunk,在networkClient的completeResponse阶段触发

completeResponses(responses)

RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

再来看handleProduceResponse,有响应用completeBatch 方法partResp处理

private void handleProduceResponse(ClientResponse response, Map batches, long now) {
        RequestHeader requestHeader = response.requestHeader();
        long receivedTimeMs = response.receivedTimeMs();
        int correlationId = requestHeader.correlationId();
        if (response.wasDisconnected()) {
    //省略
        } else {
            log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId);
            // if we have a response, parse it
            //有响应
            if (response.hasResponse()) {
                ProduceResponse produceResponse = (ProduceResponse) response.responseBody();
                for (Map.Entry entry : produceResponse.responses().entrySet()) {
                    TopicPartition tp = entry.getKey();
                    ProduceResponse.PartitionResponse partResp = entry.getValue();
                    ProducerBatch batch = batches.get(tp);
                    completeBatch(batch, partResp, correlationId, now, receivedTimeMs + produceResponse.throttleTimeMs());
                }
                this.sensors.recordLatency(response.destination(), response.requestLatencyMs());
            } else {
            //不需要响应completeBatch
                // this is the acks = 0 case, just complete all requests
                for (ProducerBatch batch : batches.values()) {
                    completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);
                }
            }
        }
    }

completeBatch方法最重要的部分在此

if (batch.done(response.baseOffset, response.logAppendTime, null)) {
            maybeRemoveFromInflightBatches(batch);
            this.accumulator.deallocate(batch);
        }

done方法中的completeFutureAndFireCallbacks

completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);

callback.onCompletion回调函数安排上了!

private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
        // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
        produceFuture.set(baseOffset, logAppendTime, exception);

        // 一条消息就代表一个Thunk,循 环执行每个消息的 Callback 回调
        for (Thunk thunk : thunks) {
            try {
                if (exception == null) {
                    Recordmetadata metadata = thunk.future.value();
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(metadata, null);
                } else {
                    if (thunk.callback != null)
                        thunk.callback.onCompletion(null, exception);
                }
            } catch (Exception e) {
                log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
            }
        }

        produceFuture.done();
    }
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/278910.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号