我们先来看一下RocketMQ的消息存储流程,当消息发送到RocketMQ上时,会被顺序写入CommitLog文件,这样能保证消息存储的高性能和高吞吐量。
但是消息是按照Topic来消费的,如果消费时从CommitLog上查找对应的消息时,会比较慢。为了提高消息消费的效率,RocketMQ会将Topic一样的消息放在ConsumerQueue中,每个ConsumerQueue又分为几个写队列,一个队列一个文件。
假如创建一个名为TopicTest的topic,并创建4个写队列。那么在RocketMQ是通过如下形式存储的
需要注意的是,CommitLog和ComsumerQueue并不是将相同的消息存储了2份。CommitLog存储了消息原始的内容,而ComsumerQueue主要存储了消息在CommitLog中的偏移量,具体的消息格式看下图
borker端存储的消息格式如下
| 内容 | 解释 | 长度 |
|---|---|---|
| TOTALSIZE | 消息总长度 | 4字节 |
| MAGICCODE | 魔术,固定值Oxdaa320a7 | 4字节 |
| BODYCRC | 消息crc校验码 | 4字节 |
| QUEUEID | 消息队列id | 4字节 |
| FLAG | 消息flag,供应用程序使用 | 4字节 |
| QUEUEOFFSET | 消息在消费队列的偏移量 | 8字节 |
| PHYSICALOFFSET | 消息在CommitLog文件中的偏移量 | 8字节 |
| SYSFLAG | 消息系统flag,例如是否压缩,是否是事务消息等 | 4字节 |
| BORNTIMESTAMP | 生产者调用消息发送API的时间戳 | 8字节 |
| BORNHOST | 消息发送者ip,端口号 | 8字节 |
| STORETIMESTAMP | 消息存储时间戳 | 8字节 |
| STOREHOSTADDRESS | Broker服务器ip+端口号 | 8字节 |
| RECONSUMETIMES | 消息重试次数 | 4字节 |
| Prepared Transaction Offset | 事务消息物理偏移量 | 8字节 |
| BodyLength | 消息体长度 | 4字节 |
| Body | 消息体内容 | BodyLength字节 |
| TopicLength | topic长度,1字节,即主题名称不能超过255个字符 | 1字节 |
| Topic | 主题 | TopicLength字节 |
| PropertiesLength | 消息属性长度 | 2字节 |
| Properties | 消息属性 | PropertiesLength字节 |
ConsumerQueue中消息的格式如下
根据commitlog offset 和 size 就能从IndexFile中获取到具体的消息内容,而 tag hashcode 用来根据topic+tag消费时过滤消息
从存储图看到还有一个IndexFile和CommitLog也有关系
IndexFile的主要作用就是用来根据Message Key和Unique Key查找对应的消息
IndexFile文件结构如下所示
从图中可以看出,IndexFile主要分为如下3部分,IndexHead,Hash槽,Index条目
IndexHead的格式如下
| 字段 | 解释 |
|---|---|
| beginTimestamp | 消息的最小存储时间 |
| endTimestamp | 消息的最大存储时间 |
| beginPhyOffset | 消息的最小偏移量(commitLog文件中的偏移量) |
| endPhyOffset | 消息的最大偏移量(commitLog文件中的偏移量) |
| hashSlotCount | hash槽个数 |
| indexCount | index条目当前已使用的个数 |
Hash槽存储的内容为落在该Hash槽内的Index的索引(看后面图示你就会很清楚了)
每个Index条目的格式如下
| 字段 | 解释 |
|---|---|
| hashcode | key的hashcode |
| phyoffset | 消息的偏移量(commitLog文件中的偏移量) |
| timedif | 该消息存储时间与第一条消息的时间戳的差值,小于0该消息无效 |
| pre index no | 该条目的前一条记录的Index索引,当hash冲突时,用来构建链表 |
key的组成有如下两种形式
- Topic#Unique Key
- Topic#Message Key
Unique Key是在producer端发送消息生成的
// DefaultMQProducerImpl#sendKernelImpl
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
Message Key是我们在发送消息的时候设置的哈,通常具有业务意义,方便我们快速查找消息
// 指定 topicName,tagName,MessageKey,消息内容,然后发送消息
String messageKey = UUID.randomUUID().toString();
Message message = new Message(TOPIC_NAME, TAG_NAME, messageKey,
("hello rocketmq " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.println(sendResult);
IndexFile构成过程比较麻烦,画图演示一下把,你可以把IndexFile想成基于文件实现的HashMap。
假如说往数组长度为10的HashMap依次放入3条key为11,34,21的数据,HashMap的结构如下
将key为11,34,21的数据放到IndexFile中的过程如下(假如hash槽的数量为10)
具体的过程为
- 将消息顺序放到Index条目中,将11放到index=1的位置(用index[1]表示哈),11%1=1,算出hash槽的位置是1,存的值是0(刚开始都是0,用hash[0]表示),将index[1].preIndexNo=hash[0]=0,hash[0]=1(1为index数组下标哈)
- 将34放到index[2],34%10=4,index[2].preIndexNo=hash[0]=0
- 将21放到index[3],21%10=1,index[3].preIndexNo=hash[1]=1
从图中可以看出来,当发生hash冲突时Index条目的preIndexNo属性充当了链表的作用。查找的过程和HashMap基本类似,先定位到槽的位置,然后顺着链表找就行了。
对具体算法感兴趣的可以看看源码,我就不贴代码了,有点多
// IndexFile的构建过程 org.apache.rocketmq.store.index.IndexFile#putKey // IndexFile的查找过程 org.apache.rocketmq.store.index.IndexFile#selectPhyOffset其他文件
除了上述三种文件外,在rocketmq store文件夹下还有如下几种其他文件
lock:有时候一台机器上会起多个broker,如果数据文件放在一个目录,这时候可以通过锁来提示你使用另一个目录,防止冲突
checkpoint:文件检查点,存储commitLog最后一次刷盘时间戳,consumeQueue最后一次刷盘时间戳,IndexFile最后一次刷盘时间戳
config:运行期间一些配置信息
abort:如果存在abort文件说明Broker非正常关闭,该文件默认启动时创建,正常退出时删除
[1]https://itzones.cn/2019/07/07/RocketMQ%E5%AD%98%E5%82%A8%E6%96%87%E4%BB%B6/



