栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ的死信队列

RabbitMQ的死信队列

文章目录
  • 一、什么是死信队列?
  • 二、死信出现的情况
  • 三、什么是TTL?
    • 1. 队列设置TTL
    • 2. 消息设置TTL
  • 四、死信队列
    • 1. 原理
    • 2. 实现


一、什么是死信队列?

死信队列是RabbitMQ中的一种消息机制,死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列,那么死信消息会被丢到死信交队列中,如果没有配置,那么死信消息将会被丢弃。

二、死信出现的情况
  1. 消息被否定确认(使用channel.basicNack或channel.basicReject拒绝消息),并且不进行消息重新发送(将requeue属性设置为false)
  2. 消息在队列中的存活时间超过了设置的TTL时间(消息或者队列超过了设定的时间,消息都会进入死信队列)
  3. 消息队列中的消息已经超过队列的最大长度
三、什么是TTL?

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接受,但是过了这个消息之后消息将会被自动删除。RabbitMQ中可以对消息和队列设置TTL。目前两种方式可以设置

1. 队列设置TTL

通过对队列设置TTL,使得队列中的所有消息都有一个统一的过期时间

 @Bean
    public Queue TestQueueTTL(){
        // 设置消息的过期时间为7s
        return QueueBuilder.durable("test_ttl_queue").withArgument("x-message-ttl",7000).build();
    }
2. 消息设置TTL

通过对于每个消息单独设置过期时间,自由性更高

@GetMapping("/test/message/ttl")
    public String TestMessageTTL(){
        MessageProperties properties = new MessageProperties();
        // 设置消息的过期时间为3sz`
        properties.setExpiration("7000");
        Message m = new Message("测试该消息经过7s之后是否会消失".getBytes(),properties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("TestFanoutExchange",null,m,correlationData);
        return "已经成功发送TTL消息";
    }

注意!!!
若将要发送到的消息队列中还有未被消费的非TTL消息,那么此时将这个单独设置的过期时间的消息发送过去,该消息不会到期消失,会成为一条永久消息(原因未知)

四、死信队列 1. 原理

DLX,全称为dead-letter-exchange,也可以称之为死信交换机。当一个消息在一个队列中变为了死信消息,那么这个消息就可以被重新发送到另一个交换机中,这个绑定的交换机就被称之为死信交换机,与该交换机绑定的队列就被称之为死信队列。

2. 实现
  1. 生产者发送消息
 @GetMapping("/test/message/ttl")
    public String TestMessageTTL(){
        MessageProperties properties = new MessageProperties();
        Message m = new Message("发送一条死信消息".getBytes(),properties);
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("TestFanoutExchange",null,m,correlationData);
        return "已经成功发送TTL消息";
    }
  1. 定义交换机和队列,并将死信交换机绑定到队列上,然后再讲死信交换机与死信队列相绑定
@Configuration
public class DirectRabbitConfig {

    @Bean
    public FanoutExchange TestFanoutExchange(){
        return new FanoutExchange("TestFanoutExchange",true,false);
    }
    
    
    @Bean
    public Queue TestQueueTTL(){
        // 设置消息的过期时间为7s
        Map map = new HashMap<>();
        map.put("x-message-ttl",7000);
        map.put("x-dead-letter-exchange","TestDLX");
        map.put("x-dead-letter-routing-key","TestDLXRouting");
        return QueueBuilder.durable("test_ttl_queue").withArguments(map).build();
    }

    @Bean
    public Binding TestQueueTTLBinding(){
        return BindingBuilder.bind(TestQueueTTL()).to(TestFanoutExchange());
    }


     
    @Bean
    public DirectExchange TestDLX(){
        return new DirectExchange("TestDLX",true,false);
    }

    
    @Bean
    public Queue TestDLXQueue(){
        return new Queue("TestDLXQueue",true);
    }

    @Bean
    public Binding TestDLXBinding(){
        return BindingBuilder.bind(TestDLXQueue()).to(TestDLX()).with("TestDLXRouting");
    }
}

  1. 只定义一个消费者用来接收死信队列传过来的消息(因为对正常队列没有定义消费者,所以正常队列中的消息在到了过期时间之后就会自动转为死信消息,发往死信交换机中)
@Component
public class DirectReceiver {

    @RabbitHandler
    @RabbitListener(queues = "TestDLXQueue")
    public void receive2(Message message, Channel channel) {
        // 消息正常消费
        System.out.println(message);

        // 手动应答Broker
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/457057.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号