一、发送消息服务端入口之前介绍了客户端的发送实现,客户端发送到服务端,服务端怎么处理消息的我们看一下源码实现
public class ServerCnx {
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
// 获取创建时存储的生产者
CompletableFuture producerFuture = producers.get(send.getProducerId());
Producer producer = producerFuture.getNow(null);
// 校验非持久化的等待消息数是否超过最大值默认1000
if (producer.isNonPersistentTopic()) {
if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {
final long producerId = send.getProducerId();
final long sequenceId = send.getSequenceId();
final long highestSequenceId = send.getHighestSequenceId();
service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), SafeRun.safeRun(() -> {
commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1, -1);
}));
producer.recordMessageDrop(send.getNumMessages());
return;
} else {
nonPersistentPendingMessages++;
}
}
// 校验当前发送速率和等待发送的消息是否超过最大值
startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
// 省略事务相关
// HighestSequenceId和SequenceId
// 客户端会批量发送,一批里每个消息SequenceId都不一样(一批也可能只有一条),有大有小
// 一批中最大那个是highestSequenceId,最小的是lowestSequenceId,在这里最小的也是send.getSequenceId()
// 因为批量消息的SequenceId设置的是第一条添加进来的消息SequenceId
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
// 批量发送
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
} else {
// 单条
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.isIsChunk(), send.isMarker());
}
}
}
重点在producer.publishMessage(...),我们分析最复杂的场景批量发送。
二、Producer中的发送,第一层回调MessagePublishContextpublic class Producer {
public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) {
// 最小比最大还大肯定客户端的批量发送有问题
if (lowestSequenceId > highestSequenceId) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.metadataError,
"Invalid lowest or highest sequence id");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
// 主要验证消息的完整性、加密、统计发送速率用于发送限流
if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) {
// 发送
publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked,
isMarker);
}
}
}
可以看到重点在publishMessageToTopic(...)
private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
long batchSize, boolean isChunked, boolean isMarker) {
// 最终还是topic调用发送
// MessagePublishContext 把发送的参数都放入到这个请求上下文对象
// 同时它也是发送的回调函数
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker));
}
接着分析持久化topic的发送
非持久化实现:接到消息,找到topic上订阅的消费者分发给它,如果存在对等集群,同步一份过去。没了
@Override
public class PersistentTopic {
public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
// 等待发送的消息数+1,减1正常是在发送成功的回调里,后面会分析
pendingWriteOps.incrementAndGet();
// topic可能被删了
if (isFenced) {
publishContext.completed(new TopicFencedException("fenced"), -1, -1);
// 上面说的等待数减1,后面的过程有异常都得减1
// 还有:当前topic关联的所有生产消费者全部断开关闭,当前ledger关闭,Bundle关闭,同步到对等集群
// 一句话:topic相关的基本全部清空
decrementPendingWriteOpsAndCheck();
return;
}
// 校验消息是否超过配置的最大
// 该配置精确到topic级别
if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
publishContext.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
// 同上
decrementPendingWriteOpsAndCheck();
return;
}
// 消息是不是复制过来的
MessageDeduplication.MessageDupStatus status =
messageDeduplication.isDuplicate(publishContext, headersAndPayload);
switch (status) {
case NotDup:
// 添加
asyncAddEntry(headersAndPayload, publishContext);
break;
case Dup:
// Immediately acknowledge duplicated message
publishContext.completed(null, -1, -1);
decrementPendingWriteOpsAndCheck();
break;
default:
publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
decrementPendingWriteOpsAndCheck();
}
}
}
重点在asyncAddEntry(headersAndPayload, publishContext);
private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
// 是否有Entrymetadata拦截器
if (brokerService.isBrokerEntrymetadataEnabled()) {
ledger.asyncAddEntry(headersAndPayload,
(int) publishContext.getNumberOfMessages(), this, publishContext);
} else {
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
}
}
继续ledger.asyncAddEntry(...)
四、ManagedLedgerImpl中的发送,第三层回调OpAddEntrypublic class ManagedLedgerImpl {
public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx) {
// 封装到OpAddEntry,同时也是回调函数
OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);
// 同一个topic插入是同一个线程
executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
}
}
等后面看OpAddEntry的回调内容
目前回忆一下,已经有好几层回调了
1、producer中委托topic调用发送,回调是MessagePublishContext
2、topic中委托ledger调用发送回调,topic对象
3、ledger中调用发送回调是OpAddEntry
粒度由粗到细,将来回调是一层一层向上返。
为什么这么设计?因为每层的职责都不同,减少同步调用,增加并发宽度,提升整体性能。
继续internalAsyncAddEntry(addOperation)
public class ManagedLedgerImpl {
// 这里是加了锁的,因为有很多属性指标统计例如entrie数,ledger大小等都有线程安全问题
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
// 拦截器
if (!beforeAddEntry(addOperation)) {
return;
}
// 当前ledger状态
final State state = STATE_UPDATER.get(this);
// 全是异常校验,用解释了吧
if (state == State.Fenced) {
addOperation.failed(new ManagedLedgerFencedException());
return;
} else if (state == State.Terminated) {
addOperation.failed(new ManagedLedgerTerminatedException("Managed ledger was already terminated"));
return;
} else if (state == State.Closed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
} else if (state == State.WriteFailed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
return;
}
// 等待添加的Entry bk插入成功回调中删除
pendingAddEntries.add(addOperation);
// 异常状态
if (state == State.ClosingLedger || state == State.CreatingLedger) {
if (State.CreatingLedger == state) {
long elapsedMs = System.currentTimeMillis() - this.lastLedgerCreationInitiationTimestamp;
if (elapsedMs > TimeUnit.SECONDS.toMillis(2 * config.getmetadataOperationsTimeoutSeconds())) {
this.createComplete(Code.TimeoutException, null, null);
}
}
} else if (state == State.ClosedLedger) {
if (STATE_UPDATeR.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
mbean.startDataLedgerCreateOp();
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}
} else {
// 这次添加设置当前ledger
addOperation.setLedger(currentLedger);
// 统计新增的Entries数,可以发现批量也是+1,而不是拆开,因为批量就是一个消息作为整体,只是内部有多个
++currentLedgerEntries;
// 增加ledger总字节数
currentLedgerSize += addOperation.data.readableBytes();
// 如果ledger:总entry数超过配置或者ledger大小或者滚动时间到了都会关闭ledger切换新ledger
if (currentLedgerIsFull()) {
addOperation.setCloseWhenDone(true);
STATE_UPDATER.set(this, State.ClosingLedger);
}
// 初始化
addOperation.initiate();
}
}
}
可以看到关键在最后一行addOperation.initiate();
public class OpAddEntry {
public void initiate() {
if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) {
ByteBuf duplicateBuffer = data.retainedDuplicate();
addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
lastInitTime = System.nanoTime();
// 这里的ledger不是上面的了managedLeader对象,是LedgerHandle,用于和BK交互的客户端
ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
} else {
}
}
}
继续ledger.asyncAddEntry(...)
五、LedgerHandle中的发送,第四层回调PendingAddOppublic class LedgerHandle {
public void asyncAddEntry(ByteBuf data, final AddCallback cb, final Object ctx) {
// 这个方法getCurrentEnsemble() 获取当前的bookie列表,后面写入用
PendingAddOp op = PendingAddOp.create(this, clientCtx, getCurrentEnsemble(), data, writeFlags, cb, ctx);
doAsyncAddEntry(op);
}
}
继续doAsyncAddEntry(op);
public class LedgerHandle {
protected void doAsyncAddEntry(final PendingAddOp op) {
// bk请求的流控
if (throttler != null) {
throttler.acquire();
}
boolean wasClosed = false;
synchronized (this) {
// 客户端状态校验
if (isHandleWritable()) {
// 可以看到entryId是客户端自己创建的还是自增
// 也就是只要保证同一个ledger下entryId不重复就可以了
// 上一次添加成功的entryId+1
long entryId = ++lastAddPushed;
long currentLedgerLength = addToLength(op.payload.readableBytes());
op.setEntryId(entryId);
op.setLedgerLength(currentLedgerLength);
// 又是放入队列等待发送,回调中移除
pendingAddOps.add(op);
} else {
wasClosed = true;
}
}
// 异常状态就不分析了跳过
if (wasClosed) {
try {
clientCtx.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() {
@Override
public void safeRun() {
op.cb.addCompleteWithLatency(BKException.Code.LedgerClosedException,
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
}
@Override
public String toString() {
return String.format("AsyncAddEntryToClosedLedger(lid=%d)", ledgerId);
}
});
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(BookKeeper.getReturnRc(clientCtx.getBookieClient(),
BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
}
return;
}
// 这行挺关键,也是Pulsar架构存储设计的亮点吧
// 下面会单独解释,请先看后面解释,再回到这里往下看
DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
try {
// 校验一下上面计算出的列表中的副本是不是都能写
if (!waitForWritable(ws, 0, clientCtx.getConf().waitForWriteSetMs)) {
op.allowFailFastOnUnwritableChannel();
}
} finally {
ws.recycle();
}
try {
// 由线程池执行,同一个ledgerId用相同的线程执行
clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
} catch (RejectedExecutionException e) {
op.cb.addCompleteWithLatency(
BookKeeper.getReturnRc(clientCtx.getBookieClient(), BKException.Code.InterruptedException),
LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx);
}
}
}
这里插入解释一下DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(op.getEntryId());
内部实现就是下代码
为了保证写入数据的高可用,配置文件要配ensembleSize,writeQuorumSize,ackQuorumSize
ensembleSize:总共操作几个存储节点也就是bookie
writeQuorumSize:总共写入几个副本
ackQuorumSize:写入几个副本算成功
下面代码结果,比如操作5个,写3个,3个都写入成功算请求成功。
副本5个分别是1,2,3,4,5
由于entryId是自增+1,上面介绍了++lastAddPushed
所以假设第一次写入的副本分别是1,2,3
第二次:2,3,4
第三次:3,4,5
第四次:4,5,1
…
这就是条带化写入
private void reset(int ensembleSize, int writeQuorumSize,
long entryId) {
setSize(writeQuorumSize);
for (int w = 0; w < writeQuorumSize; w++) {
set(w, (int) ((entryId + w) % ensembleSize));
}
}
上上面是请求放入线程池中执行clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);,
所以op一定是直接或间接实现了Runnable接口,直接看它的run。
class PendingAddOp {
public void safeRun() {
hasRun = true;
if (callbackTriggered) {
maybeRecycle();
return;
}
this.requestTimeNanos = MathUtils.nowInNano();
checkNotNull(lh);
checkNotNull(lh.macManager);
// 封装要发送的数据
this.toSend = lh.macManager.computeDigestAndPackageForSending(
entryId, lh.lastAdd/confirm/ied, currentLedgerLength,
payload);
payload = null;
lh.maybeHandleDelayedWriteBookieFailure();
// 上面讲过
DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);
try {
// 遍历每个副本,循环写入
for (int i = 0; i < writeSet.size(); i++) {
sendWriteRequest(ensemble, writeSet.get(i));
}
} finally {
writeSet.recycle();
}
}
}
继续看sendWriteRequest(ensemble, writeSet.get(i))
class PendingAddOp {
void sendWriteRequest(List ensemble, int bookieIndex) {
int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : FLAG_NONE;
// toSend是封装的数据
// 外面遍历传入第几个副本index,ensemble.get(bookieIndex)从可操作副本中获取对应的bookie
clientCtx.getBookieClient().addEntry(ensemble.get(bookieIndex),
lh.ledgerId, lh.ledgerKey, entryId, toSend, this, bookieIndex,
flags, allowFailFast, lh.writeFlags);
//等待发送请求数,bk响应回调中会减
++pendingWriteRequests;
}
}
可以看到上面addEntry是BookieClient调用的,当前的PendingAddOp又是回调函数。
实现了下面这个接口。每个副本写入成功都会调用writeComplete。
后面就不分析了,以后分析bookeeper时再讲,其实还没完上面的addEntry中还有回调,还有很多内容,因为才刚刚到bk的客户端,要按bookeeper通讯协议封装等。。。
所以我们假设bk响应成功了,调用了writeComplete,我们看下writeComplete实现
public interface WriteCallback {
void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx);
}
public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
int bookieIndex = (Integer) ctx;
--pendingWriteRequests;
// 异常场景发之前是bookie1,收到响应是bookie2
if (!ensemble.get(bookieIndex).equals(addr)) {
return;
}
boolean ackQuorum = false;
// bk处理成功的响应
if (BKException.Code.OK == rc) {
// 很重要,副本成功的记录到一个列表中
// 当成功的副本数量大于等于配置的ackQuorumSize设置ackQuorum=true
ackQuorum = ackSet.completeBookieAndCheck(bookieIndex);
addEntrySuccessBookies.add(ensemble.get(bookieIndex));
}
// 异常场景一般是false不会进来
if (completed) {
if (rc != BKException.Code.OK) {
// Got an error after satisfying AQ. This means we are under replicated at the create itself.
// Update the stat to reflect it.
clientCtx.getClientStats().getAddOpUrCounter().inc();
if (!clientCtx.getConf().disableEnsembleChangeFeature.isAvailable()
&& !clientCtx.getConf().delayEnsembleChange) {
lh.notifyWriteFailed(bookieIndex, addr);
}
}
// even the add operation is completed, but because we don't reset completed flag back to false when
// #unsetSuccessAndSendWriteRequest doesn't break ack quorum constraint. we still have current pending
// add op is completed but never callback. so do a check here to complete again.
//
// E.g. entry x is going to complete.
//
// 1) entry x + k hits a failure. lh.handleBookieFailure increases blockAddCompletions to 1, for ensemble
// change
// 2) entry x receives all responses, sets completed to true but fails to send success callback because
// blockAddCompletions is 1
// 3) ensemble change completed. lh unset success starting from x to x+k, but since the unset doesn't break
// ackSet constraint. #removeBookieAndCheck doesn't set completed back to false.
// 4) so when the retry request on new bookie completes, it finds the pending op is already completed.
// we have to trigger #sendAddSuccessCallbacks
//
sendAddSuccessCallbacks();
maybeRecycle();
return;
}
// bk响应的异常场景
switch (rc) {
case BKException.Code.OK:
// continue
break;
case BKException.Code.ClientClosedException:
// bookie client is closed.
lh.errorOutPendingAdds(rc);
return;
case BKException.Code.IllegalOpException:
// illegal operation requested, like using unsupported feature in v2 protocol
lh.handleUnrecoverableErrorDuringAdd(rc);
return;
case BKException.Code.LedgerFencedException:
LOG.warn("Fencing exception on write: L{} E{} on {}",
ledgerId, entryId, addr);
lh.handleUnrecoverableErrorDuringAdd(rc);
return;
case BKException.Code.UnauthorizedAccessException:
LOG.warn("Unauthorized access exception on write: L{} E{} on {}",
ledgerId, entryId, addr);
lh.handleUnrecoverableErrorDuringAdd(rc);
return;
default:
if (clientCtx.getConf().delayEnsembleChange) {
if (ackSet.failBookieAndCheck(bookieIndex, addr)
|| rc == BKException.Code.WriteOnReadOnlyBookieException) {
Map failedBookies = ackSet.getFailedBookies();
lh.handleBookieFailure(failedBookies);
} else {
}
} else {
lh.handleBookieFailure(ImmutableMap.of(bookieIndex, addr));
}
return;
}
// ackQuorum上面介绍过,就是配置的所有副本都接受到响应成功
if (ackQuorum && !completed) {
// 异常场景 如果配置了强制写入最小数量,但是没有写够
if (clientCtx.getConf().enforceMinNumFaultDomainsForWrite
&& !(clientCtx.getPlacementPolicy()
.areAckedBookiesAdheringToPlacementPolicy(addEntrySuccessBookies,
lh.getLedgermetadata().getWriteQuorumSize(),
lh.getLedgermetadata().getAckQuorumSize()))) {
clientCtx.getClientStats().getWriteDelayedDueToNotEnoughFaultDomains().inc();
if (writeDelayedStartTime == -1) {
writeDelayedStartTime = MathUtils.nowInNano();
}
} else {
// 正常进入这里
completed = true;
this.qwcLatency = MathUtils.elapsedNanos(requestTimeNanos);
if (writeDelayedStartTime != -1) {
clientCtx.getClientStats()
.getWriteDelayedDueToNotEnoughFaultDomainsLatency()
.registerSuccessfulEvent(MathUtils.elapsedNanos(writeDelayedStartTime),
TimeUnit.NANOSECONDS);
}
// 这是最后一行了
sendAddSuccessCallbacks();
}
}
}
现在是消息持久化成功了
接下来全是回调,一层一层向上一直到最开始那个回调,最后一层响应到发送的client把消息id写回去。
挑一个比较重要的回调解析一下吧,ManagedLedgerImpl发送里的OpAddEntry,它跟消费者有关
public void safeRun() {
OpAddEntry firstInQueue = ml.pendingAddEntries.poll();
if (firstInQueue == null) {
return;
}
// 大家这里和上面的校验可能有疑问
// 队列获取一个难道不会成别的?
// 当然不会
// 还记得上面有一行
//LedgerHandle中的clientCtx.getMainWorkerPool().executeOrdered(ledgerId, op);
// 同一个ledger用同一个线程,保证同一个ledger发送都是按顺序
// LedgerHandle和pendingAddEntries属于ManagedLedgerImpl维护
if (this != firstInQueue) {
firstInQueue.failed(new ManagedLedgerException("Unexpected add entry op when complete the add entry op."));
return;
}
ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml);
ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength);
// 如果存在活着的消费者
if (ml.hasActiveCursors()) {
// 添加成功的数据放入缓存
EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data);
ml.entryCache.insert(entry);
entry.release();
}
PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml);
ml.lastConfirmedEntry = lastEntry;
// true 上面讲过也在ManagedLedgerImpl中
// 发之前如果ledger满了切换,会把closeWhenDone改成true
if (closeWhenDone) {
ledger.asyncClose(this, ctx);
} else {
updateLatency();
// 上一层回调,topic中的
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
cb.addComplete(lastEntry, data.asReadOnly(), ctx);
// 消费者启动后与broker建立长连接,会主动请求拉,如果有数据从可读的开始读配置的条数
// 读完推给消费者
// 如果没有数据则连接等待
// 如果有数据了服务端推给消费客户端
// 就是这里推的
// 所以可以看出pulsar的消费模式是长连接+push
ml.notifyCursors();
ml.notifyWaitingEntryCallBacks();
ReferenceCountUtil.release(data);
this.recycle();
} else {
ReferenceCountUtil.release(data);
}
}
}
以上就是发送的服务端的全部实现,逐行解析实现思路,相信大家一定有所收获,下一篇开始介绍消费者的底层实现原理,还是先从消费者客户端开始分析。



