一、客户端消息接收入口上一篇介绍了消费者请求拉取消息,最后发送给消费者。本章介绍消费者接收。
public class ClientCnx {
protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayload) {
checkArgument(state == State.Ready);
ConsumerImpl> consumer = consumers.get(cmdMessage.getConsumerId());
if (consumer != null) {
List ackSets = Collections.emptyList();
if (cmdMessage.getAckSetsCount() > 0) {
ackSets = new ArrayList<>(cmdMessage.getAckSetsCount());
for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
ackSets.add(cmdMessage.getAckSetAt(i));
}
}
// 接收
consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), ackSets, headersAndPayload, this);
}
}
}
继续consumer.messageReceived
二、单个消费者ConsumerImpl接收消息处理public class ConsumerImpl三、用户回调消息处理{ void messageReceived(MessageIdData messageId, int redeliveryCount, List ackSet, ByteBuf headersAndPayload, ClientCnx cnx) { if (!verifyChecksum(headersAndPayload, messageId)) { // discard message with checksum error discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch); return; } Messagemetadata msgmetadata; try { msgmetadata = Commands.parseMessagemetadata(headersAndPayload); } catch (Throwable t) { discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch); return; } // 批量发送可能>1 final int numMessages = msgmetadata.getNumMessagesInBatch(); final int numChunks = msgmetadata.hasNumChunksFromMsg() ? msgmetadata.getNumChunksFromMsg() : 0; // 分块 final boolean isChunkedMessage = numChunks > 1 && conf.getSubscriptionType() != SubscriptionType.Shared; MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex()); // 重复检查 if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) { increaseAvailablePermits(cnx, numMessages); return; } // 解密 ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgmetadata, headersAndPayload, cnx); boolean isMessageUndecryptable = isMessageUndecryptable(msgmetadata); if (decryptedPayload == null) { return; } ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain() : uncompressPayloadIfNeeded(messageId, msgmetadata, decryptedPayload, cnx, true); decryptedPayload.release(); if (uncompressedPayload == null) { return; } // 非批量处理 if (isMessageUndecryptable || (numMessages == 1 && !msgmetadata.hasNumMessagesInBatch())) { if (isChunkedMessage) { uncompressedPayload = processMessageChunk(uncompressedPayload, msgmetadata, msgId, messageId, cnx); if (uncompressedPayload == null) { return; } } // If the topic is non-persistent, we should not ignore any messages. if (this.topicName.isPersistent() && isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) { uncompressedPayload.release(); return; } final MessageImpl message = MessageImpl.create(topicName.toString(), msgId, msgmetadata, uncompressedPayload, createEncryptionContext(msgmetadata), cnx, schema, redeliveryCount, poolMessages); uncompressedPayload.release(); internalPinnedExecutor.execute(() -> { // 有死信队列并且超过最大重新投递次数,说明当前是最后一次投递了 先放入map // 消息还是会发给用户,如果用户再拒绝会直接发往死信队列 if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) { possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(), Collections.singletonList(message)); } // 有正在等待接收的请求直接放入 // 比如调用receive()阻塞,当前获取到阻塞的CompletableFuture然后complete if (hasNextPendingReceive()) { notifyPendingReceivedCallback(message, null); } // 没有等待的消费者,消费者可能正在处理业务。 // 放入等待队列incomingMessages // 如果有等待的批量接收 获取批量的CompletableFuture然后complete else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) { notifyPendingBatchReceivedCallBack(); } }); } else { // 批量处理 receiveIndividualMessagesFromBatch(msgmetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx); uncompressedPayload.release(); } // 如果用户是使用的MessageListener监听则调用Listener // 需要知道上面处理完消息已经存入incomingMessages internalPinnedExecutor.execute(() -> tryTriggerListener()); }
继续tryTriggerListener
protected void triggerListener() {
try {
// 说明只能有一个线程调用用户的发送,executorQueueSize默认0
// 如果失败也没关系,因为消息已经存入incomingMessages,下面是个递归
if (executorQueueSize.get() < 1) {
// 从incomingMessages.poll一条
final Message msg = internalReceive(0, TimeUnit.MILLISECONDS);
if (msg != null) {
// 加1
executorQueueSize.incrementAndGet();
// Key_Shared使用同一个线程
if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
// 处理消息
callMessageListener(msg));
} else {
getExternalExecutor(msg).execute(() -> {
callMessageListener(msg);
});
}
}
}
} catch (PulsarClientException e) {
return;
}
}
继续callMessageListener(...)
protected void callMessageListener(Messagemsg) { try { // 用户的监听 listener.received(Consumerbase.this, msg); } catch (Throwable t) { } finally { // 减1成0 executorQueueSize.decrementAndGet(); // 递归 triggerListener(); } } }
numListenerThreads配置的线程数
如果只有一个消费者可以看出配1个和配10个区别不大,如果想提高消费性能配10个应该创建10个consumer客户端。
上面这条路线是单个消费者的实现
接下来看一下多消费者MultiTopicsConsumerImpl
之前有讲过这篇创建的实现,最后是没介绍跟消费相关的。
Pulsar源码解析-客户端-多消费者MultiTopicsConsumerImpl创建底层实现
我们接上上面这篇
public class MultiTopicsConsumerImplextends Consumerbase { private void startReceivingMessages(List > newConsumers) { if (getState() == State.Ready) { newConsumers.forEach(consumer -> { consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize()); internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer)); }); } } }
可以看到每个具体消费者都会先调用拉取请求,然后调用receiveMessageFromConsumer(consumer)
继续receiveMessageFromConsumer(...)
public class MultiTopicsConsumerImplextends Consumerbase { private void receiveMessageFromConsumer(ConsumerImpl consumer) { // 主动调用ReceiveAsync() consumer.receiveAsync().thenAcceptAsync(message -> { // 处理接收 messageReceived(consumer, message); // 等待队列超过最大配置挂起消费者 int size = incomingMessages.size(); if (size >= maxReceiverQueueSize || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) { pausedConsumers.add(consumer); resumeReceivingFromPausedConsumersIfNeeded(); } else { // 递归当前方法 receiveMessageFromConsumer(consumer); } }, internalPinnedExecutor).exceptionally(ex -> { return null; }); } }
有2点很重要:
1、consumer.receiveAsync()
2、messageReceived(consumer, message);
先看1
public class ConsumerImpl{ protected CompletableFuture > internalReceiveAsync() { // 添加一个异常后回调处理 CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler(); CompletableFuture > result = cancellationHandler.createFuture(); internalPinnedExecutor.execute(() -> { // 当前消费者的等待队列获取 Message message = incomingMessages.poll(); // 服务端没有推送消息 if (message == null) { // 等待 pendingReceives.add(result); cancellationHandler.setCancelAction(() -> pendingReceives.remove(result)); } else { // increaseAvailablePermits+1,记录指标等 messageProcessed(message); // 直接完成 result.complete(beforeConsume(message)); } }); return result; } }
从上面我们可以看出如果incomingMessages没有读到消息consumer.receiveAsync()是阻塞的
上上面讲单个消费者接到消息时校验有没有等待的消费请求,有直接complete。
这里接续看第2点。
public class MultiTopicsConsumerImpl{ private void messageReceived(ConsumerImpl consumer, Message message) { checkArgument(message instanceof MessageImpl); TopicMessageImpl topicMessage = new TopicMessageImpl<>(consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message, consumer); // 用户的消费者实现是MultiTopicsConsumerImpl主动调用ReceiveAsync没有数据挂起。 CompletableFuture > receivedFuture = nextPendingReceive(); if (receivedFuture != null) { // 跟踪未ack unAckedMessageTracker.add(topicMessage.getMessageId()); // 直接complete completePendingReceive(receivedFuture, topicMessage); } // 放入等待队列 else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) { notifyPendingBatchReceivedCallBack(); } if (listener != null) { // 上面介绍过了,调用用户的监听 triggerListener(); } } }
继续看一下
public abstract class Consumerbase{ protected boolean canEnqueueMessage(Message message) { return true; } protected boolean enqueueMessageAndCheckBatchReceive(Message message) { int messageSize = message.size(); // canEnqueueMessage上面的空实现true // 放入incomingMessages if (canEnqueueMessage(message) && incomingMessages.offer(message)) { INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize); } return hasEnoughMessagesForBatchReceive(); } }
可以看到有消息的换位:从单个consumerImpl获取到放入多消费者的MultiTopicsConsumerImpl的incomingMessages。
为什么这么做?
因为多消费者创建是MultiTopicsConsumerImpl实例,配置的MessageListener是它在维护,不是具体的ConsumerImpl。调用Receive也是MultiTopicsConsumerImpl.ReceiveAsync()
在MultiTopicsConsumerImpl场景中,内部的ConsumerImpl用来接收消息放入自己的incomingMessages或者直接放入接收请求,MultiTopicsConsumerImpl中启动每个ConsumerImpl进行接收,接收请求获取从incomingMessages读取或者挂起。最终的ack相关api还是由具体的ConsumerImpl操作。



