【尚硅谷】2021新版RocketMQ教程丨深度掌握MQ消息中间件
工作原理 消息生产producer获取nameserver的topic路由表 , 向指定的broker的queue发送消息
存储 消息写入一个broker中所有topic消息都在commitlog中 , 满1g后生成新的 , 文件名为第一条消息的空间大小偏移量 , 每条消息都会记录当前的消息偏移量 , comsumequeue , 生产者 , 消息长度 ;
向commitlog中记录queue索引的具体消息 , 再向topic中的comsumequeue写索引信息 , 索引信息记录了该条消息处于commitlog的偏移量 , 同一个queue中的消息的tag可能是不同的
消息读取consumer获取broker中queue当前消费位置, 计算offset , 像 broker提交 queue , queueoffset , tag信息 , 在对应的comsumequeue中找到索引, 再计算commitlog的实际偏移量获取实际消息体
性能零拷贝 , comsumequeue的cache顺序写读 , 页预读机制 , commitlog存放所有topic消息 , 存在随机读写
生产者发送给queue选择轮询 或者 选择当前rt最短的节点
轮询 , queue接受响应延迟 , 影响之后的消息发送 , 导致producer中消息积压;
最短rt , 可能消息都发送到一个queue中, 对应的消费者压力增大
indexfile针对包含key的消息创建的索引 , 索引文件名为创建的时间戳 , 指定时间条件查询消息;
每个indexfile记录了最多2000w个索引单元 , 每个索引单元记录了消息在commitlog的物理偏移量;
消息的消费 消费类型可选拉取 和 推送 , 推送consumer与queue维持长连接 , 有消息时, broker触发consumer的回调 , 通知comsumer拉取消息
消费模式广播 , 集群; 广播每个topic的消息都会发送给consumerGroup下的所有consumer, 消息进度在consumer端保存; 集群只会给同一consumerG的consumer发送消息 , 消费进度在broker端保存;
rebalance在集群消费模式下 , 消费者, queue数量变化 需要重新分配queue与consumer的对应关系 ;
这种情况会导致消费暂停 , 消息积压, 如果是异步提交ack , 还会导致消息的重复消费 ,
订阅关系的一致性同一个消费者组的消费者的配置必须完全相同 , 消费的主题 , 消费的tag
offset管理广播模式消费进度在consumer本地 ,
集群模式在broker , 为了保证rebalance后仍能定位到
consumer第一次消费时, 会提交offset , 一般为queue的最后消费位置, consumer消费完后提交offset , offfset会写到broker的offset记录文件中 ;
消费失败时, broker会为topic下的group创建一个重试队列, 重新消费 ;
offset同步提交需要broker收到consumer消费完成的信息再返回ack , 保证消费幂等性但会造成消费者阻塞 ;
异步提交增加吞吐量 , 仍会进行ack;
- 生产者端重复生产 : broker没有及时确认生产的消息 , 导致生产者再次发送相同消息;
- 消费者rebalance 或没有及时接受消费者消费的ack
使用业务中幂等唯一标识设置到key中 , 消费时确认缓存和db都不存在该标识在进行消费, 消费成功后 写db, 再写缓存;
消费堆积 与延迟消费堆积, 主要是消费逻辑影响 ,是否有io阻塞
消息的清理commitlog默认3天过期 , 磁盘使用率超标清理, 过期清理线清理过期文件 , 清理警戒线清理最老的文件 , 危险线禁止写入
rocketMq应用 启动启动nameserver ,启动broker , 启动控制台~
普通消息同步发送 , 生产者发送后阻塞 , 等待broker的ack, ack中包含生产者生成的msgid, broker设置的msgOffset , ack会出现刷盘超时, slave同步超时等;
异步发送, 生产者发送后不等待ack;
单项发送 , broker不ack;
全局有序
保证只有1个队列 , 由1个消费者消费 , 就能实现顺序消费 ;
分区有序
实现选择queue 策略, 将业务上有序的消息hash后与queue数量取模放入同一个queue, 比如订单号相同, 但状态不同的消息 , 消费指定去消费这个queue , 消费时判断key , 只消费属于自己逻辑的key; 每个消费key的逻辑都要单独使用一个consumerGroup, 才能在集群模式下实现效果;
实现消息延迟投递, 常用来超时订单状态判断 ;
rmq延时时长是默认的几个取值 , 只能在服务端修改 , 每个取值标识一个延迟等级;
生产消息时, 指定延迟等级, 为延迟索引+1 , broker判断是否有延迟设置 , 有就放入commitlog ,然后放到一个对应延迟等级的topic , broker内部有个scheduler服务, 会为每个延迟等级topic创建一个timetask , 来去定时检查每个topic中的第一条消息, 如果消息到期了, 就会像生产一条普通消息一样放到commitlog,topic中 ;



