栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

Pulsar源码解析-服务端-Producer消息发送底层实现

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Pulsar源码解析-服务端-Producer消息发送底层实现

之前介绍了客户端的发送实现,客户端发送到服务端,服务端怎么处理消息的我们看一下源码实现

一、发送消息服务端入口
public class ServerCnx {

   protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
   		// 获取创建时存储的生产者
        CompletableFuture producerFuture = producers.get(send.getProducerId());
        Producer producer = producerFuture.getNow(null);
        // 校验非持久化的等待消息数是否超过最大值默认1000
        if (producer.isNonPersistentTopic()) {
            if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {
                final long producerId = send.getProducerId();
                final long sequenceId = send.getSequenceId();
                final long highestSequenceId = send.getHighestSequenceId();
                service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> {
                    commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1, -1);
                }));
                producer.recordMessageDrop(send.getNumMessages());
                return;
            } else {
                nonPersistentPendingMessages++;
            }
        }
		// 校验当前发送速率和等待发送的消息是否超过最大值
        startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
		// 省略事务相关
		
		// HighestSequenceId和SequenceId
		// 客户端会批量发送,一批里每个消息SequenceId都不一样(一批也可能只有一条),有大有小
		// 一批中最大那个是highestSequenceId,最小的是lowestSequenceId,在这里最小的也是send.getSequenceId()
		// 因为批量消息的SequenceId设置的是第一条添加进来的消息SequenceId
        if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
        	// 批量发送
            producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
                    headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
        } else {
        	// 单条
            producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
                    send.getNumMessages(), send.isIsChunk(), send.isMarker());
        }
    }
}

重点在producer.publishMessage(...),我们分析最复杂的场景批量发送。

二、Producer中的发送,第一层回调MessagePublishContext
public class Producer {

    public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
            ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
            // 最小比最大还大肯定客户端的批量发送有问题
        if (lowestSequenceId > highestSequenceId) {
            cnx.execute(() -> {
                cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.metadataError,
                        "Invalid lowest or highest sequence id");
                cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
            });
            return;
        }
        // 主要验证消息的完整性、加密、统计发送速率用于发送限流
        if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) {
        	// 发送
            publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked,
                    isMarker);
        }
    }
}

可以看到重点在publishMessageToTopic(...)

    private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
                                       long batchSize, boolean isChunked, boolean isMarker) {
          // 最终还是topic调用发送
          // MessagePublishContext 把发送的参数都放入到这个请求上下文对象
          // 同时它也是发送的回调函数
        topic.publishMessage(headersAndPayload,
                MessagePublishContext.get(this, lowestSequenceId,
                        highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
                        isChunked, System.nanoTime(), isMarker));
    }

接着分析持久化topic的发送
非持久化实现:接到消息,找到topic上订阅的消费者分发给它,如果存在对等集群,同步一份过去。没了

三、Topic中的发送,第二层回调PersistentTopic
@Override
public class PersistentTopic {

    public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
    	// 等待发送的消息数+1,减1正常是在发送成功的回调里,后面会分析
        pendingWriteOps.incrementAndGet();
        // topic可能被删了
        if (isFenced) {
            publishContext.completed(new TopicFencedException("fenced"), -1, -1);
            // 上面说的等待数减1,后面的过程有异常都得减1
            // 还有:当前topic关联的所有生产消费者全部断开关闭,当前ledger关闭,Bundle关闭,同步到对等集群
            // 一句话:topic相关的基本全部清空
            decrementPendingWriteOpsAndCheck();
            return;
        }
        // 校验消息是否超过配置的最大 
        // 该配置精确到topic级别
        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
            publishContext.completed(new NotAllowedException("Exceed maximum message size")
                    , -1, -1);
                    // 同上
            decrementPendingWriteOpsAndCheck();
            return;
        }
		// 消息是不是复制过来的
        MessageDeduplication.MessageDupStatus status =
                messageDeduplication.isDuplicate(publishContext, headersAndPayload);
        switch (status) {
            case NotDup:
            	// 添加
                asyncAddEntry(headersAndPayload, publishContext);
                break;
            case Dup:
                // Immediately acknowledge duplicated message
                publishContext.completed(null, -1, -1);
                decrementPendingWriteOpsAndCheck();
                break;
            default:
                publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
                decrementPendingWriteOpsAndCheck();

        }
    }
}

重点在asyncAddEntry(headersAndPayload, publishContext);

    private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
        // 是否有Entrymetadata拦截器
        if (brokerService.isBrokerEntrymetadataEnabled()) {
            ledger.asyncAddEntry(headersAndPayload,
                    (int) publishContext.getNumberOfMessages(), this, publishContext);
        } else {
            ledger.asyncAddEntry(headersAndPayload, this, publishContext);
        }
    }

继续ledger.asyncAddEntry(...)

四、ManagedLedgerImpl中的发送,第三层回调OpAddEntry
public class ManagedLedgerImpl {
    public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx) {

		// 封装到OpAddEntry,同时也是回调函数
        OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);
		// 同一个topic插入是同一个线程
        executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
    }
}

等后面看OpAddEntry的回调内容
目前回忆一下,已经有好几层回调了
1、producer中委托topic调用发送,回调是MessagePublishContext
2、topic中委托ledger调用发送回调,topic对象
3、ledger中调用发送回调是OpAddEntry
粒度由粗到细,将来回调是一层一层向上返。
为什么这么设计?因为每层的职责都不同,减少同步调用,增加并发宽度,提升整体性能。

继续internalAsyncAddEntry(addOperation)

public class ManagedLedgerImpl {

// 这里是加了锁的,因为有很多属性指标统计例如entrie数,ledger大小等都有线程安全问题
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
		// 拦截器
        if (!beforeAddEntry(addOperation)) {
            return;
        }
        // 当前ledger状态
        final State state = STATE_UPDATER.get(this);
        // 全是异常校验,用解释了吧
        if (state == State.Fenced) {
            addOperation.failed(new ManagedLedgerFencedException());
            return;
        } else if (state == State.Terminated) {
            addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
            return;
        } else if (state == State.Closed) {
            addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
            return;
        } else if (state == State.WriteFailed) {
            addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
            return;
        }
        // 等待添加的Entry bk插入成功回调中删除
        pendingAddEntries.add(addOperation);
		// 异常状态
        if (state == State.ClosingLedger || state == State.CreatingLedger) {
            if (State.CreatingLedger == state) {
                long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
                if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getmetadataOperationsTimeoutSeconds())) {
                    this.createComplete(Code.TimeoutException, null, null);
                }
            }
        } else if (state == State.ClosedLedger) {
            if (STATE_UPDATeR.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
                this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
                mbean.startDataLedgerCreateOp();
                asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
            }
        } else {
			// 这次添加设置当前ledger
            addOperation.setLedger(currentLedger);
            // 统计新增的Entries数,可以发现批量也是+1,而不是拆开,因为批量就是一个消息作为整体,只是内部有多个
            ++currentLedgerEntries;
            // 增加ledger总字节数
            currentLedgerSize += addOperation.data.readableBytes();
            // 如果ledger:总entry数超过配置或者ledger大小或者滚动时间到了都会关闭ledger切换新ledger
            if (currentLedgerIsFull()) {
                addOperation.setCloseWhenDone(true);
                STATE_UPDATER.set(this, State.ClosingLedger);
            }
            // 初始化
            addOperation.initiate();
        }
    }
}

可以看到关键在最后一行addOperation.initiate();

public class OpAddEntry {
    public void initiate() {
        if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {

            ByteBuf duplicateBuffer = data.retainedDuplicate();

            addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
            lastInitTime = System.nanoTime();
            // 这里的ledger不是上面的了managedLeader对象,是LedgerHandle,用于和BK交互的客户端
            ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
        } else {
        }
    }
}

继续ledger.asyncAddEntry(...)

五、LedgerHandle中的发送,第四层回调PendingAddOp
public class LedgerHandle {
    public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
    	// 这个方法getCurrentEnsemble() 获取当前的bookie列表,后面写入用
        PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(), data, writeFlags, cb, ctx);
        doAsyncAddEntry(op);
    }
}

继续doAsyncAddEntry(op);

public class LedgerHandle {

protected void doAsyncAddEntry(final PendingAddOp op) {
		// bk请求的流控
        if (throttler != null) {
            throttler.acquire();
        }
		
        boolean wasClosed = false;
        synchronized (this) {
        	// 客户端状态校验
            if (isHandleWritable()) {
            	// 可以看到entryId是客户端自己创建的还是自增
            	// 也就是只要保证同一个ledger下entryId不重复就可以了
            	// 上一次添加成功的entryId+1
                long entryId = ++lastAddPushed;
                long currentLedgerLength = addToLength(op.payload.readableBytes());
                op.setEntryId(entryId);
                op.setLedgerLength(currentLedgerLength);
                // 又是放入队列等待发送,回调中移除
                pendingAddOps.add(op);
            } else {
                wasClosed = true;
            }
        }
		// 异常状态就不分析了跳过
        if (wasClosed) {
            try {
                clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
                    @Override
                    public void safeRun() {
op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
                                LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
                    }

                    @Override
                    public String toString() {
                        return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
                    }
                });
            } catch (RejectedExecutionException e) {
                op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
                                                                    BKException.Code.InterruptedException),
                        LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
            }
            return;
        }
		// 这行挺关键,也是Pulsar架构存储设计的亮点吧
		// 下面会单独解释,请先看后面解释,再回到这里往下看
        DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
        try {
        	// 校验一下上面计算出的列表中的副本是不是都能写
            if (!waitForWritable(ws, 0, clientCtx.getConf().waitForWriteSetMs)) {
                op.allowFailFastOnUnwritableChannel();
            }
        } finally {
            ws.recycle();
        }

        try {
        	// 由线程池执行,同一个ledgerId用相同的线程执行
            clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
        } catch (RejectedExecutionException e) {
            op.cb.addCompleteWithLatency(
                    BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException),
                    LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
        }
    }
}

这里插入解释一下DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
内部实现就是下代码
为了保证写入数据的高可用,配置文件要配ensembleSize,writeQuorumSize,ackQuorumSize
ensembleSize:总共操作几个存储节点也就是bookie
writeQuorumSize:总共写入几个副本
ackQuorumSize:写入几个副本算成功

下面代码结果,比如操作5个,写3个,3个都写入成功算请求成功。
副本5个分别是1,2,3,4,5
由于entryId是自增+1,上面介绍了++lastAddPushed
所以假设第一次写入的副本分别是1,2,3
第二次:2,3,4
第三次:3,4,5
第四次:4,5,1

这就是条带化写入

private void reset(int ensembleSize, int writeQuorumSize,
                           long entryId) {
            setSize(writeQuorumSize);
            for (int w = 0; w < writeQuorumSize; w++) {
                set(w, (int) ((entryId + w) % ensembleSize));
            }
        }

上上面是请求放入线程池中执行clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);,
所以op一定是直接或间接实现了Runnable接口,直接看它的run。

class PendingAddOp {

public void safeRun() {
        hasRun = true;
        if (callbackTriggered) {
            maybeRecycle();
            return;
        }

        this.requestTimeNanos = MathUtils.nowInNano();
        checkNotNull(lh);
        checkNotNull(lh.macManager);
		// 封装要发送的数据
        this.toSend = lh.macManager.computeDigestAndPackageForSending(
                entryId, lh.lastAdd/confirm/ied, currentLedgerLength,
                payload);
        payload = null;
        lh.maybeHandleDelayedWriteBookieFailure();
		// 上面讲过
        DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);

        try {
        	// 遍历每个副本,循环写入
            for (int i = 0; i < writeSet.size(); i++) {
                sendWriteRequest(ensemble, writeSet.get(i));
            }
        } finally {
            writeSet.recycle();
        }
    }
}

继续看sendWriteRequest(ensemble, writeSet.get(i))

class PendingAddOp {

    void sendWriteRequest(List ensemble, int bookieIndex) {
        int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;

		// toSend是封装的数据
		// 外面遍历传入第几个副本index,ensemble.get(bookieIndex)从可操作副本中获取对应的bookie
        clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex),
                                             lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex,
                                             flags, allowFailFast, lh.writeFlags);
                                             
		//等待发送请求数,bk响应回调中会减
        ++pendingWriteRequests;
    }
}

可以看到上面addEntry是BookieClient调用的,当前的PendingAddOp又是回调函数。
实现了下面这个接口。每个副本写入成功都会调用writeComplete。
后面就不分析了,以后分析bookeeper时再讲,其实还没完上面的addEntry中还有回调,还有很多内容,因为才刚刚到bk的客户端,要按bookeeper通讯协议封装等。。。
所以我们假设bk响应成功了,调用了writeComplete,我们看下writeComplete实现

    public interface WriteCallback {
        void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx);
    }
 public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
        int bookieIndex = (Integer) ctx;
        --pendingWriteRequests;
		// 异常场景发之前是bookie1,收到响应是bookie2
        if (!ensemble.get(bookieIndex).equals(addr)) {
            return;
        }
		
        boolean ackQuorum = false;
        // bk处理成功的响应
        if (BKException.Code.OK == rc) {
        	// 很重要,副本成功的记录到一个列表中
        	// 当成功的副本数量大于等于配置的ackQuorumSize设置ackQuorum=true
            ackQuorum = ackSet.completeBookieAndCheck(bookieIndex);
            addEntrySuccessBookies.add(ensemble.get(bookieIndex));
        }
		// 异常场景一般是false不会进来
        if (completed) {
            if (rc != BKException.Code.OK) {
                // Got an error after satisfying AQ. This means we are under replicated at the create itself.
                // Update the stat to reflect it.
                clientCtx.getClientStats().getAddOpUrCounter().inc();
                if (!clientCtx.getConf().disableEnsembleChangeFeature.isAvailable()
                        && !clientCtx.getConf().delayEnsembleChange) {
                    lh.notifyWriteFailed(bookieIndex, addr);
                }
            }
            
			// even the add operation is completed, but because we don't reset completed flag back to false when
            // #unsetSuccessAndSendWriteRequest doesn't break ack quorum constraint. we still have current pending
            // add op is completed but never callback. so do a check here to complete again.
            //
            // E.g. entry x is going to complete.
            //
            // 1) entry x + k hits a failure. lh.handleBookieFailure increases blockAddCompletions to 1, for ensemble
            //    change
            // 2) entry x receives all responses, sets completed to true but fails to send success callback because
            //    blockAddCompletions is 1
            // 3) ensemble change completed. lh unset success starting from x to x+k, but since the unset doesn't break
            //    ackSet constraint. #removeBookieAndCheck doesn't set completed back to false.
            // 4) so when the retry request on new bookie completes, it finds the pending op is already completed.
            //    we have to trigger #sendAddSuccessCallbacks
            //
            sendAddSuccessCallbacks();
            maybeRecycle();
            return;
        }
		// bk响应的异常场景
        switch (rc) {
        case BKException.Code.OK:
            // continue
            break;
        case BKException.Code.ClientClosedException:
            // bookie client is closed.
            lh.errorOutPendingAdds(rc);
            return;
        case BKException.Code.IllegalOpException:
            // illegal operation requested, like using unsupported feature in v2 protocol
            lh.handleUnrecoverableErrorDuringAdd(rc);
            return;
        case BKException.Code.LedgerFencedException:
            LOG.warn("Fencing exception on write: L{} E{} on {}",
                    ledgerId, entryId, addr);
            lh.handleUnrecoverableErrorDuringAdd(rc);
            return;
        case BKException.Code.UnauthorizedAccessException:
            LOG.warn("Unauthorized access exception on write: L{} E{} on {}",
                    ledgerId, entryId, addr);
            lh.handleUnrecoverableErrorDuringAdd(rc);
            return;
        default:
            if (clientCtx.getConf().delayEnsembleChange) {
                if (ackSet.failBookieAndCheck(bookieIndex, addr)
                        || rc == BKException.Code.WriteOnReadOnlyBookieException) {
                    Map failedBookies = ackSet.getFailedBookies();
                    lh.handleBookieFailure(failedBookies);
                } else {
                }
            } else {
                lh.handleBookieFailure(ImmutableMap.of(bookieIndex, addr));
            }
            return;
        }
		// ackQuorum上面介绍过,就是配置的所有副本都接受到响应成功
        if (ackQuorum && !completed) {
        	// 异常场景 如果配置了强制写入最小数量,但是没有写够
            if (clientCtx.getConf().enforceMinNumFaultDomainsForWrite
                && !(clientCtx.getPlacementPolicy()
                              .areAckedBookiesAdheringToPlacementPolicy(addEntrySuccessBookies,
                                                                        lh.getLedgermetadata().getWriteQuorumSize(),
                                                                        lh.getLedgermetadata().getAckQuorumSize()))) {
                clientCtx.getClientStats().getWriteDelayedDueToNotEnoughFaultDomains().inc();
                if (writeDelayedStartTime == -1) {
                    writeDelayedStartTime = MathUtils.nowInNano();
                }
            } else {
            	// 正常进入这里
                completed = true;
                this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);

                if (writeDelayedStartTime != -1) {
                    clientCtx.getClientStats()
                             .getWriteDelayedDueToNotEnoughFaultDomainsLatency()
                             .registerSuccessfulEvent(MathUtils.elapsedNanos(writeDelayedStartTime),
                                                      TimeUnit.NANOSECONDS);
                }
				// 这是最后一行了
                sendAddSuccessCallbacks();
            }
        }
    }

现在是消息持久化成功了
接下来全是回调,一层一层向上一直到最开始那个回调,最后一层响应到发送的client把消息id写回去。
挑一个比较重要的回调解析一下吧,ManagedLedgerImpl发送里的OpAddEntry,它跟消费者有关

public void safeRun() {
        
        OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
        if (firstInQueue == null) {
            return;
        }
        // 大家这里和上面的校验可能有疑问
        // 队列获取一个难道不会成别的?
        // 当然不会
        // 还记得上面有一行
        //LedgerHandle中的clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
		// 同一个ledger用同一个线程,保证同一个ledger发送都是按顺序
		// LedgerHandle和pendingAddEntries属于ManagedLedgerImpl维护
        if (this != firstInQueue) {
            firstInQueue.failed(new ManagedLedgerException("Unexpected add entry op when complete the add entry op."));
            return;
        }

        ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
        ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
        // 如果存在活着的消费者
        if (ml.hasActiveCursors()) {
            // 添加成功的数据放入缓存
            EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data);
            ml.entryCache.insert(entry);
            entry.release();
        }

        PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
        ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml);
        ml.lastConfirmedEntry = lastEntry;
		// true 上面讲过也在ManagedLedgerImpl中
		// 发之前如果ledger满了切换,会把closeWhenDone改成true
        if (closeWhenDone) {
            ledger.asyncClose(this, ctx);
        } else {
            updateLatency();
            // 上一层回调,topic中的
            AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
            if (cb != null) {
                cb.addComplete(lastEntry, data.asReadOnly(), ctx);
                // 消费者启动后与broker建立长连接,会主动请求拉,如果有数据从可读的开始读配置的条数
                // 读完推给消费者
                // 如果没有数据则连接等待
                // 如果有数据了服务端推给消费客户端
                // 就是这里推的
                // 所以可以看出pulsar的消费模式是长连接+push
                ml.notifyCursors();
                ml.notifyWaitingEntryCallBacks();
                ReferenceCountUtil.release(data);
                this.recycle();
            } else {
                ReferenceCountUtil.release(data);
            }
        }
    }

以上就是发送的服务端的全部实现,逐行解析实现思路,相信大家一定有所收获,下一篇开始介绍消费者的底层实现原理,还是先从消费者客户端开始分析。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677463.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号