栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ通过死信队列实现延迟消息

RabbitMQ通过死信队列实现延迟消息

死信队列&死信交换器:DLX 全称(Dead-Letter-Exchange),称之为死信交换器,当消息变成一个死信之后,如果这个消息所在的队列存在x-dead-letter-exchange参数,那么它会被发送到x-dead-letter-exchange对应值的交换器上,这个交换器就称之为死信交换器,与这个死信交换器绑定的队列就是死信队列。

死信消息:

消息被拒绝(Basic.Reject或Basic.Nack)并且设置 requeue 参数的值为 false
消息过期了
队列达到最大的长度
过期消息:

在 rabbitmq 中存在2种方可设置消息的过期时间,第一种通过对队列进行设置,这种设置后,该队列中所有的消息都存在相同的过期时间,第二种通过对消息本身进行设置,那么每条消息的过期时间都不一样。如果同时使用这2种方法,那么以过期时间小的那个数值为准。当消息达到过期时间还没有被消费,那么那个消息就成为了一个 死信 消息。

队列设置:在队列申明的时候使用 x-message-ttl 参数,单位为 毫秒

单个消息设置:是设置消息属性的 expiration 参数的值,单位为 毫秒

延时队列:在rabbitmq中不存在延时队列,但是我们可以通过设置消息的过期时间和死信队列来模拟出延时队列。消费者监听死信交换器绑定的队列,而不要监听消息发送的队列。

参考链接:https://blog.csdn.net/fu_huo_1993/article/details/88350188

添加消息队列的枚举配置类QueueEnum:

@Data
public enum QueueEnum {

    
    QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),

    
    QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");

    
    private String exchange;

    
    private String name;

    
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }

}

添加RabbitMQ的配置

@Configuration
public class RabbitMqConfig {

    
    @Bean
    public DirectExchange orderDirect() {
        return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    
    @Bean
    public DirectExchange orderTtlDirect() {
        return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    
    @Bean
    public Queue orderQueue() {
        return new Queue (QueueEnum.QUEUE_ORDER_CANCEL.getName());
    }

    
    @Bean
    public Queue orderTtlQueue() {
        return  QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName())
                    .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())
                    .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())
                    .build();
    }

    
    @Bean
    public Binding orderBinding(DirectExchange orderDirect, Queue orderQueue){
        return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());
    }

    
    @Bean
    public Binding orderBinding(DirectExchange orderTtlDirect, Queue orderTtlQueue){
        return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());
    }

}

添加延迟消息的发送者CancelOrderSender

@Component
public class CancelOrderSender {

    private static Logger LOGGER = LoggerFactory.getLogger(CancelOrderSender.class);

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId, final long delayTimes){
        //给延迟队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }
        });
    }
}

添加取消订单消息的接收者CancelOrderReceiver:用于从取消订单的消息队列(mall.order.cancel)里接收消息。

@Component
@RabbitListener(queues = "mall.order.cancel")
public class CancelOrderReceiver {

    private static Logger LOGGER = LoggerFactory.getLogger(CancelOrderReceiver.class);

    @Autowired
    private OmsPortalOrderService portalOrderService;

    @RabbitHandler
    public void handle(Long orderId){

        LOGGER.info("receive delay message orderId:{}", orderId);

        portalOrderService.cancelOrder(orderId);
    }

}

参考链接:https://mp.weixin.qq.com/s/Rp4TfejQkYN00oQ-kMCRcg

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711391.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号