栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RocketMQ-延迟消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RocketMQ-延迟消息

前言

延迟消息对于RocketMQ来说实现这个操作挺简单的,不用像RabbitMQ那样需要设置死信队列,简化了客户端操作,但是对于RocketMQ开源版(也就是RocketMQ)的是不支持自定义延迟时间的,只能使用RocketMQ提供的1-18个延迟级别,但是阿里云ONS是可以设置自定义时间的,收费版的还是牛逼!

延迟消息实现原理

首先生产者发送一条消息费Broker,那么发送一条延迟消息是需要设置延迟等级的,然后这条消息会被写入到CommitLog中,那么写入到CommitLog中后会分发到相应的Queue中,那么在投递给Queue时,会先判断这条消息是否携带了延迟等级,如果没有设置延迟等级,那么直接进入5的流程,正常投递给对应的Topic下的Queue中,如果有的话那么就会修改Topic为SCHEDULE_TOPIC前缀,然后根据延迟等级,在ConsumerQueue目录中SCHEDULE_TOPIC_XXX主题下创建对应的QueueID目录与ConsumerQueue文件,(如果没有SCHEDULE_TOPIC_XXX那么就自动创建),然后修改消息索引单元中的Message Tag HashCode部分原本存放的消息的Tag的Hash值,现修改为消息的投递时间,投递时间是指该消息被从新修改为原Topic后再次被写入到commitLog中的时间,(投递时间=消息存储时间+延迟等级时间),消息存储时间指的是消息被发送到Broker时的时间戳,然后将消息索引写入到SCHEDULE_TOPIC_XXX主题下对应的ConsumerQueue中

投递延迟消息

Broker内部有一个延迟消息的服务类,其会消费SCHEDULE_TOPIC_XXXX中的消息,即按照每条消息投递时间,将延迟消息投递到目标Topic中,不过,在投递之前会从CommitLog中将原来写入的消息再次读出,并将其原来的延迟等级设置为0,即原消息变为一条不延迟的普通消息,然后再次投递到目标Topic中

ScheduleMessageService在Broker启动时,会创建并启动一个定时器Timer,用于执行相应的定时任务,系统会根据延迟等级的个数,定义相应数量的TimerTask,每个TimerTask负责一个延迟等级消息的消息投递,每个TimerTask都会检测相应Queue队列的第一条消息是否到期,若第一条消息未到期,则后面所有消息更不会到期,(消息是按投递时间排序的),若第一条消息到期了,则该消息投递到目标Topic,即消费该消息

将消息从新写入commitLog

延迟消息服务类ScheduleMessageService将延迟消息再次发送给commitLog,并再次形成新的消息索引条目,分发到对应的Queue中

这其实就是一次普通消息发送,只不过这次消息的Producer是延迟消息服务类ScheduleMessageService

就会投递到Topic为SCHEDULE_TOPIC开头的Queue中去,那么这时发送的消息就不会再有延迟了,会直接写入到目标Topic下的消费者队列中去的,那么消费者就订阅消费的

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/677530.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号