栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rocketmq 源码分析笔记

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rocketmq 源码分析笔记

基于 rocketmq-all-4.7.1

消息存储 写入CommitLog后 ,将commitLog写入的事件转发到ComsumeQueue和IndexFile ,从而引发:写入IndexFile、ConsumeQueue。

找到该转发代码 如下:


//BrokerController.java:
class BrokerController{
//...
private MessageStore messageStore;//
//...
public void start() throws Exception {
//...
this.messageStore.start();
//...
}
//...
}

//DefaultMessageStore.java
class DefaultMessageStore  implements MessageStore{
	//...
	private final ReputMessageService reputMessageService;
	//...
	public void start() throws Exception {
	
	//...
	this.reputMessageService.start();
	//...
	}
	
	//...

}


class ReputMessageService extends ServiceThread {

	@Override
	public void run() {
	            while (!this.isStopped()) 
	            {
					//...
                    Thread.sleep(1);
                    this.doReput();
					//...
            	}
	}
}



    private void doReput() {
		//...
	    for (boolean donext = true; this.isCommitLogAvailable() && doNext; ) 
	    {
	    //...
	    DefaultMessageStore.this.doDispatch(dispatchRequest);
	    //...
	    }
	    //...

	}
// 将commitLog写入的事件转发到ComsumeQueue和IndexFile
    public void doDispatch(DispatchRequest req) {
        for (CommitLogDispatcher dispatcher : this.dispatcherList) {
            dispatcher.dispatch(req);//dispatcher 是 CommitLogDispatcherBuildConsumeQueue 或 CommitLogDispatcherBuildIndex 
        }
    }



    //K1 Consumequeue文件分发的构建器
    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }
    //K1 IndexFile文件分发的构建器
    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(request);
            }
        }
    }



如何保证消息不丢

关于raft这个地方讲错了,没这么简单:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/324561.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号