栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

天池MQ比赛总结

天池MQ比赛总结

1. 比赛题目分析

https://tianchi.aliyun.com/competition/entrance/531922/information

主要分为两阶段 性能测评 和 正确性测评。

  • 消息必须落盘 会做断电重新测试
  • topic之间不存在并发操作问题
  • 队列特别多
  • 傲腾【读写效率和内存差距不大】

如何更有效率的存储数据

缓存设计

2.存储方案

RokcetMq 和 Kafka 都是利用文件系统存储消息的消息中间件。因此本次比赛的设计方式也是借鉴了Kafka 和RocketMq 的方式去存储数据。

2.1 Kafka存储【3000S】

Kafka如何存储数据:

如下图所示为Kafka存储数据的目录,具体的文件夹名称就是 topic + partitionNum

数据文件:

上述四个文件分别为 索引文件、数据文件、时间索引文件、leader-epoch(副本数据复制使用的)【每个分区都至少有这么多的文件】可见Kafka 每个分区都会创建很多文件的(所以Kafka也不支持Topic 和分区数量特别多的场景,因为创建的文件也是超级多可能会OOM或者 open file 失败的情况,而且如果分区特别多 数据比较少的情况,Kafka的吞吐量优势也是体现不出来的)

//Kafka 每个Segment 对应的文件后缀
object Log {
  val LogFileSuffix = ".log"
  val IndexFileSuffix = ".index"
  val TimeIndexFileSuffix = ".timeindex"
  val ProducerSnapshotFileSuffix = ".snapshot"
  val TxnIndexFileSuffix = ".txnindex"
  val DeletedFileSuffix = ".deleted"
  val CleanedFileSuffix = ".cleaned"
  val SwapFileSuffix = ".swap"
  val CleanShutdownFile = ".kafka_cleanshutdown"
  val DeleteDirSuffix = "-delete"
  val FutureDirSuffix = "-future"
……
}

索引文件:

Kafka 存储索引文件的时候选择的是稀疏索引的类型,【查找的时候使用的是二分查找的方式】所以 上边说的那种场景,稀疏索引文件的优势也就体现不出来了,因为众所周知二分查找更适合数据量偏大的场景。

存储索引项的源码:

//如果添加的数据量大于了indexIntervalBytes 添加一个索引项 默认 4Kb
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
  //添加索引文件数据
  offsetIndex.append(largestOffset,physicalPosition)
  //对应时间戳索引 
  tieIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)            				
  bytesSinceLastIndexEntry = 0   
 } 
bytesSinceLastIndexEntry += records.sizeInBytes
2.2 rocketMq存储的方式【3000S】

RocketMq如何存储数据:

需要注意的是RocketMQ的所有Topic 的数据是存储在一个文件中的,文件的大小限制默认是1G和Kafka一致。存满重新创建一个文件。【消息的顺序性得到保障】

而针对消费者,为了提升消费速率,RocketMq有针对每个queueId 存储了一个记录磁盘地址的文件,想当与Kafka 的索引文件,只不过是全量索引文件。

2.3 rocketMq 和 Kafka 方式的对比

1.分区队列特别多?

这次比赛是100个Topic 和5000个队列,实际算下就是有 500000 个queue。明显不适合Kakfa,其实针对与 RocketMq也快达到瓶颈了。但是明显mq的方案比Kafka的方案更加适合

2.mmap 和fileChannel?

网上看搜集的资料可以总结为:

mmap 写小数据非常快,读大数据比较快,因为直接将文件映射到内存,就相当于直接从内存读取数据,但是mmap去映射一个文件不能超过1.5GB,而且关于mmap操作的释放处理非常麻烦。具体的原理还没看太明白,有带继续研究?

fileChannel 在 聚合数据后的读写效率不低于mmap切操作简单,所以选择了filechannel 进行操作。

2.3 添加傲腾【优化200S】 3.优化 3.1 线程聚合 减少force的次数【1480S】

如上图所示摒弃了每个线程单独写的情况,因为正确性验证是断电的情况,所以每条数据在返回给生产者写入完成前必须要force磁盘。因为fileChannel 写小数据 force的效率极低。所以选择了线程聚合写数据的方式。

是个线程聚合一批数据写磁盘,

3.2 unsafe copy buffer

数据 写入和读取的流程

堆内 -> 堆外 -> PageCache -> 磁盘

磁盘-> PageCache -> 堆外 -> 堆内

   public static final Unsafe unsafe = getUnsafe();

    static sun.misc.Unsafe getUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static void copyMemoryHeapTononHeap(byte[] heapBytes, long address) {
        unsafe.copyMemory(heapBytes, 16, null, address, heapBytes.length);
    }
3.3 fileChannel 的4kb对齐

fileChannel 两次分别写入 3KB 和 10Kb的数据,每次都需要force,实际上等于写入了多少数据?

fileChannel 的在force 的情况下 会存在 4Kb对齐的情况。

如下所示第一次写入 3Kb数据,第二次写入9Kb数据,实际上写入的数据 = 3 + 1 + 1 + 9 + 3

4KB对齐代码:

        int length = buffer.limit();
        int mod = length % Constant_4KB;

        if (mod % Constant_4KB != 0) {
            length = length + mod;
        }
        ByteBuffer byteBuffer = ByteBuffer.allocate(length);
        byteBuffer.put(buffer);
        byteBuffer.flip();

实际测试数据 使用3Kb写1GB的数据 和使用 4Kb写1.5G的数据,后者反而用时更短。

3.4 聚合索引和数据【1500->770】

如上图所示,为了减少force 的次数,而且又必须满足断电重启后数据不丢失。将索引数据和消息存入同一个文件,并且将索引数据全都缓存再堆内内存中(减少查询读盘的次数),每一条索引都需要记录 topic 、queueId,消息的屋里地址,和消息的大小等信息。并且索引的最后一个模块指向下组消息的首地址。recover 的时候恢复内存数据使用。

3.5 热数据 文件方式写傲腾

使用 fileChannel 往傲腾存在的目录去写数据 就跟写文件一样。因为傲腾是60G热数据只有50G,空间完全够用,不必考虑傲腾满了的情况。

3.6预分配文件

预分配文件,因为数据量是一定的,而且计算时间是在,第一条消息append 的时候才开始的。

所以程序的初始化要把所以需要创建的对象都创建出来,尽量避免运行过程中去创建对象。还可以先创建出来50g的文件并且用0进行填充,再写入数据的时候效率明显有所提升。

4最终方案 3.1最终方案【600s】

5. 不足
  • 缓存设计不足【没有充分利用傲腾的属性】
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457523.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号