RocketMQ采用文件存储的,在$HOME/store目录下,有如下文件:
commitLog: 消息存储目录config: 运行期间一些配置信息consumerqueue: 消息消费队列存储目录index: 消息索引文件存储目录abort: 如果存在该文件,则Broker非正常关闭checkpoint: 文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间、index索引文件最后一次刷盘时间戳。 消息的存储架构
RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。每条消息等待前面4个字节存储这条消息的总长度。
单个文件大小默认1G,起始偏移量为0,第二个文件的起始偏移量为1073741824。消息主要是顺序写入日志文件,当第一个文件满了后,再写入下一个文件;
indexFile 和 ComsumerQueue 中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个 CommitLog 文件上。
ConsumeQueue消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。
Consumer可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。
同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;
IndexFileIndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:KaTeX parse error: Undefined control sequence: store at position 6: HOME ̲s̲t̲o̲r̲e̲index{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
零拷贝与 MMAP 传统数据传送机制基于传统的IO方式,底层实际上通过调用read()和write()来实现。
通过read()把数据从硬盘读取到内核缓冲区,再复制到用户缓冲区;然后再通过write()写入到socket缓冲区,最后写入网卡设备。
整个过程发生了4次用户态和内核态的上下文切换和4次拷贝,具体流程如下:
- 用户进程通过read()方法向操作系统发起调用,此时上下文从用户态转向内核态DMA控制器把数据从硬盘中拷贝到读缓冲区CPU把读缓冲区数据拷贝到应用缓冲区,上下文从内核态转为用户态,read()返回用户进程通过write()方法发起调用,上下文从用户态转为内核态CPU将应用缓冲区中数据拷贝到socket缓冲区DMA控制器把数据从socket缓冲区拷贝到网卡,上下文从内核态切换回用户态,write()返回
那么什么又是DMA拷贝呢?
因为对于一个IO操作而言,都是通过CPU发出对应的指令来完成,但是相比CPU来说,IO的速度太慢了,CPU有大量的时间处于等待IO的状态。
因此就产生了DMA(Direct Memory Access)直接内存访问技术,本质上来说他就是一块主板上独立的芯片,通过它来进行内存和IO设备的数据传输,从而减少CPU的等待时间。
零拷贝零拷贝技术是指计算机执行操作时,CPU不需要先将数据从某处内存复制到另一个特定区域,这种技术通常用于通过网络传输文件时节省CPU周期和内存带宽。
零拷贝并非真的是完全没有数据拷贝的过程,只不过是减少用户态和内核态的切换次数以及CPU拷贝的次数。
整个过程发生了4次用户态和内核态的上下文切换和3次拷贝,具体流程如下:
- 用户进程通过mmap()方法向操作系统发起调用,上下文从用户态转向内核态DMA控制器把数据从硬盘中拷贝到读缓冲区上下文从内核态转为用户态,mmap调用返回用户进程通过write()方法发起调用,上下文从用户态转为内核态CPU将读缓冲区中数据拷贝到socket缓冲区DMA控制器把数据从socket缓冲区拷贝到网卡,上下文从内核态切换回用户态,write()返回
mmap()是在
代码示例:
```public class MmapCopy {
public static String path = "/Users/huzhenyang";
public static void main( String[] args ) throws Exception
{
//映射的文件
File file1 = new File(path, "1");
//映射文件的fileChannel对象(操作文件,)
FileChannel fileChannel = new RandomAccessFile(file1, "rw").getChannel();
//fileChanne 定义了map方法,MMAP的映射 1k
MappedByteBuffer mmap = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, 10240);
// 向mmap 写入数据
mmap.put("da-yang".getBytes());
//刷新写入磁盘
mmap.flip();
byte[] bb = new byte[7];
//从mmap中读取文件
mmap.get(bb,0,7);
System.out.println(new String(bb));
//解除MMAP
unmap(mmap);
}
private static void unmap(MappedByteBuffer bb) {
Cleaner cl = ((DirectBuffer)bb).cleaner();
if (cl != null)
cl.clean();
}
}
RocketMQ 中 MMAP 运用
如果按照传统的方式进行数据传送,那肯定性能上不去,所以 RocketMQ 使用的是 MMAP。
RocketMQ 一个映射文件是commitlog 文件,默认大小为 lG。
这里需要注意的是,采用 MappedByteBuffer 这种内存映射的方式有几个限制,其中之一是,一次只能映射 1.5~2G 的文件至用户态的虚拟内存,这也是 为何 RocketMQ 默认设置单个 CommitLog 日志数据文件为 1G 的原因了。
在rocketMQ源码的store中,在MappedFile类中进行了内存映射
进入init方法:



