栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Pulsar源码解析-客户端-消费者接收消息的底层实现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Pulsar源码解析-客户端-消费者接收消息的底层实现

上一篇介绍了消费者请求拉取消息,最后发送给消费者。本章介绍消费者接收。

一、客户端消息接收入口
public class ClientCnx {

    protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayload) {
        checkArgument(state == State.Ready);
        ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId());
        if (consumer != null) {
            List ackSets = Collections.emptyList();
            if (cmdMessage.getAckSetsCount() > 0) {
                ackSets = new ArrayList<>(cmdMessage.getAckSetsCount());
                for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
                    ackSets.add(cmdMessage.getAckSetAt(i));
                }
            }
            // 接收
            consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), ackSets, headersAndPayload, this);
        }
    }
}

继续consumer.messageReceived

二、单个消费者ConsumerImpl接收消息处理
public class ConsumerImpl {

void messageReceived(MessageIdData messageId, int redeliveryCount, List ackSet, ByteBuf headersAndPayload, ClientCnx cnx) {
        if (!verifyChecksum(headersAndPayload, messageId)) {
            // discard message with checksum error
            discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
            return;
        }

        Messagemetadata msgmetadata;
        try {
            msgmetadata = Commands.parseMessagemetadata(headersAndPayload);
        } catch (Throwable t) {
            discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
            return;
        }
		// 批量发送可能>1
        final int numMessages = msgmetadata.getNumMessagesInBatch();
        final int numChunks = msgmetadata.hasNumChunksFromMsg() ? msgmetadata.getNumChunksFromMsg() : 0;
        // 分块
        final boolean isChunkedMessage = numChunks > 1 && conf.getSubscriptionType() != SubscriptionType.Shared;
		
        MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
        // 重复检查
        if (acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
            increaseAvailablePermits(cnx, numMessages);
            return;
        }
		// 解密
        ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, msgmetadata, headersAndPayload, cnx);

        boolean isMessageUndecryptable = isMessageUndecryptable(msgmetadata);

        if (decryptedPayload == null) {
            return;
        }
        ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain()
                : uncompressPayloadIfNeeded(messageId, msgmetadata, decryptedPayload, cnx, true);
        decryptedPayload.release();
        if (uncompressedPayload == null) {
            return;
        }
        // 非批量处理
        if (isMessageUndecryptable || (numMessages == 1 && !msgmetadata.hasNumMessagesInBatch())) {

            if (isChunkedMessage) {
                uncompressedPayload = processMessageChunk(uncompressedPayload, msgmetadata, msgId, messageId, cnx);
                if (uncompressedPayload == null) {
                    return;
                }
            }

            // If the topic is non-persistent, we should not ignore any messages.
            if (this.topicName.isPersistent() && isSameEntry(messageId) && isPriorEntryIndex(messageId.getEntryId())) {

                uncompressedPayload.release();
                return;
            }

            final MessageImpl message = MessageImpl.create(topicName.toString(), msgId, msgmetadata,
                    uncompressedPayload, createEncryptionContext(msgmetadata), cnx, schema, redeliveryCount,
                    poolMessages);
            uncompressedPayload.release();
            internalPinnedExecutor.execute(() -> {
            	// 有死信队列并且超过最大重新投递次数,说明当前是最后一次投递了 先放入map
            	// 消息还是会发给用户,如果用户再拒绝会直接发往死信队列
                if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null &&
                        redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
                    possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
                            Collections.singletonList(message));
                }
                // 有正在等待接收的请求直接放入
                // 比如调用receive()阻塞,当前获取到阻塞的CompletableFuture然后complete
                if (hasNextPendingReceive()) {
                    notifyPendingReceivedCallback(message, null);
                }
                // 没有等待的消费者,消费者可能正在处理业务。 
                // 放入等待队列incomingMessages
                // 如果有等待的批量接收 获取批量的CompletableFuture然后complete
                else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
                    notifyPendingBatchReceivedCallBack();
                }
            });
        } else {
			// 批量处理
            receiveIndividualMessagesFromBatch(msgmetadata, redeliveryCount, ackSet, uncompressedPayload, messageId, cnx);

            uncompressedPayload.release();
        }
        // 如果用户是使用的MessageListener监听则调用Listener
        // 需要知道上面处理完消息已经存入incomingMessages
        internalPinnedExecutor.execute(()
                -> tryTriggerListener());

    }
三、用户回调消息处理

继续tryTriggerListener

    protected void triggerListener() {
        try {
        	// 说明只能有一个线程调用用户的发送,executorQueueSize默认0
        	// 如果失败也没关系,因为消息已经存入incomingMessages,下面是个递归
            if (executorQueueSize.get() < 1) {
            	// 从incomingMessages.poll一条
                final Message msg = internalReceive(0, TimeUnit.MILLISECONDS);
                if (msg != null) {
                	// 加1
                    executorQueueSize.incrementAndGet();
                    // Key_Shared使用同一个线程
                    if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
                        executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->								
                        		// 处理消息
                                callMessageListener(msg));
                    } else {
                        getExternalExecutor(msg).execute(() -> {
                            callMessageListener(msg);
                        });
                    }
                }
            }
        } catch (PulsarClientException e) {
            return;
        }
    }

继续callMessageListener(...)

    protected void callMessageListener(Message msg) {
        try {
        	// 用户的监听
            listener.received(Consumerbase.this, msg);
        } catch (Throwable t) {
        } finally {
        	// 减1成0
            executorQueueSize.decrementAndGet();
            // 递归
            triggerListener();
        }
    }
}

numListenerThreads配置的线程数
如果只有一个消费者可以看出配1个和配10个区别不大,如果想提高消费性能配10个应该创建10个consumer客户端。

上面这条路线是单个消费者的实现
接下来看一下多消费者MultiTopicsConsumerImpl

四、客户端-多消费者MultiTopicsConsumerImpl接收消息处理

之前有讲过这篇创建的实现,最后是没介绍跟消费相关的。
Pulsar源码解析-客户端-多消费者MultiTopicsConsumerImpl创建底层实现

我们接上上面这篇

public class MultiTopicsConsumerImpl extends Consumerbase {
    private void startReceivingMessages(List> newConsumers) {
        if (getState() == State.Ready) {
            newConsumers.forEach(consumer -> {
                consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
                internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
            });
        }
    }
}

可以看到每个具体消费者都会先调用拉取请求,然后调用receiveMessageFromConsumer(consumer)
继续receiveMessageFromConsumer(...)

public class MultiTopicsConsumerImpl extends Consumerbase {

private void receiveMessageFromConsumer(ConsumerImpl consumer) {
		// 主动调用ReceiveAsync()
        consumer.receiveAsync().thenAcceptAsync(message -> {
        	// 处理接收
            messageReceived(consumer, message);
			
			// 等待队列超过最大配置挂起消费者
            int size = incomingMessages.size();
            if (size >= maxReceiverQueueSize
                    || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
                pausedConsumers.add(consumer);
                resumeReceivingFromPausedConsumersIfNeeded();
            } else {
            	// 递归当前方法
                receiveMessageFromConsumer(consumer);
            }
        }, internalPinnedExecutor).exceptionally(ex -> {
            return null;
        });
    }
}

有2点很重要:
1、consumer.receiveAsync()
2、messageReceived(consumer, message);
先看1

public class ConsumerImpl {

    protected CompletableFuture> internalReceiveAsync() {
    	// 添加一个异常后回调处理
        CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
        CompletableFuture> result = cancellationHandler.createFuture();
        internalPinnedExecutor.execute(() -> {
        	// 当前消费者的等待队列获取
            Message message = incomingMessages.poll();
            // 服务端没有推送消息
            if (message == null) {
            	// 等待
                pendingReceives.add(result);
                cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
            } else {
            	// increaseAvailablePermits+1,记录指标等
                messageProcessed(message);
                // 直接完成
                result.complete(beforeConsume(message));
            }
        });

        return result;
    }
}

从上面我们可以看出如果incomingMessages没有读到消息consumer.receiveAsync()是阻塞的
上上面讲单个消费者接到消息时校验有没有等待的消费请求,有直接complete。
这里接续看第2点。

public class MultiTopicsConsumerImpl {

    private void messageReceived(ConsumerImpl consumer, Message message) {
        checkArgument(message instanceof MessageImpl);
        TopicMessageImpl topicMessage = new TopicMessageImpl<>(consumer.getTopic(),
                consumer.getTopicNameWithoutPartition(), message, consumer);
                // 用户的消费者实现是MultiTopicsConsumerImpl主动调用ReceiveAsync没有数据挂起。
        CompletableFuture> receivedFuture = nextPendingReceive();
        if (receivedFuture != null) {
        	// 跟踪未ack
            unAckedMessageTracker.add(topicMessage.getMessageId());
            // 直接complete
            completePendingReceive(receivedFuture, topicMessage);
        } 
        // 放入等待队列
        else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
            notifyPendingBatchReceivedCallBack();
        }

        if (listener != null) {
        	// 上面介绍过了,调用用户的监听
            triggerListener();
        }
    }
}

继续看一下

public abstract class Consumerbase {

    protected boolean canEnqueueMessage(Message message) {
        return true;
    }

    protected boolean enqueueMessageAndCheckBatchReceive(Message message) {
        int messageSize = message.size();
        // canEnqueueMessage上面的空实现true
        // 放入incomingMessages
        if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
            INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
        }
        return hasEnoughMessagesForBatchReceive();
    }
}

可以看到有消息的换位:从单个consumerImpl获取到放入多消费者的MultiTopicsConsumerImpl的incomingMessages。
为什么这么做?
因为多消费者创建是MultiTopicsConsumerImpl实例,配置的MessageListener是它在维护,不是具体的ConsumerImpl。调用Receive也是MultiTopicsConsumerImpl.ReceiveAsync()
在MultiTopicsConsumerImpl场景中,内部的ConsumerImpl用来接收消息放入自己的incomingMessages或者直接放入接收请求,MultiTopicsConsumerImpl中启动每个ConsumerImpl进行接收,接收请求获取从incomingMessages读取或者挂起。最终的ack相关api还是由具体的ConsumerImpl操作。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/687186.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号