消息存储 写入CommitLog后 ,将commitLog写入的事件转发到ComsumeQueue和IndexFile ,从而引发:写入IndexFile、ConsumeQueue。基于 rocketmq-all-4.7.1
找到该转发代码 如下:
//BrokerController.java:
class BrokerController{
//...
private MessageStore messageStore;//
//...
public void start() throws Exception {
//...
this.messageStore.start();
//...
}
//...
}
//DefaultMessageStore.java
class DefaultMessageStore implements MessageStore{
//...
private final ReputMessageService reputMessageService;
//...
public void start() throws Exception {
//...
this.reputMessageService.start();
//...
}
//...
}
class ReputMessageService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped())
{
//...
Thread.sleep(1);
this.doReput();
//...
}
}
}
private void doReput() {
//...
for (boolean donext = true; this.isCommitLogAvailable() && doNext; )
{
//...
DefaultMessageStore.this.doDispatch(dispatchRequest);
//...
}
//...
}
// 将commitLog写入的事件转发到ComsumeQueue和IndexFile
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);//dispatcher 是 CommitLogDispatcherBuildConsumeQueue 或 CommitLogDispatcherBuildIndex
}
}
//K1 Consumequeue文件分发的构建器
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
//K1 IndexFile文件分发的构建器
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
如何保证消息不丢
关于raft这个地方讲错了,没这么简单:



