- 一、什么是死信队列?
- 二、死信出现的情况
- 三、什么是TTL?
- 1. 队列设置TTL
- 2. 消息设置TTL
- 四、死信队列
- 1. 原理
- 2. 实现
一、什么是死信队列?
死信队列是RabbitMQ中的一种消息机制,死信消息会被RabbitMQ进行特殊处理,如果配置了死信队列,那么死信消息会被丢到死信交队列中,如果没有配置,那么死信消息将会被丢弃。
二、死信出现的情况三、什么是TTL?
- 消息被否定确认(使用channel.basicNack或channel.basicReject拒绝消息),并且不进行消息重新发送(将requeue属性设置为false)
- 消息在队列中的存活时间超过了设置的TTL时间(消息或者队列超过了设定的时间,消息都会进入死信队列)
- 消息队列中的消息已经超过队列的最大长度
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接受,但是过了这个消息之后消息将会被自动删除。RabbitMQ中可以对消息和队列设置TTL。目前两种方式可以设置
1. 队列设置TTL通过对队列设置TTL,使得队列中的所有消息都有一个统一的过期时间
@Bean
public Queue TestQueueTTL(){
// 设置消息的过期时间为7s
return QueueBuilder.durable("test_ttl_queue").withArgument("x-message-ttl",7000).build();
}
2. 消息设置TTL
通过对于每个消息单独设置过期时间,自由性更高
@GetMapping("/test/message/ttl")
public String TestMessageTTL(){
MessageProperties properties = new MessageProperties();
// 设置消息的过期时间为3sz`
properties.setExpiration("7000");
Message m = new Message("测试该消息经过7s之后是否会消失".getBytes(),properties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("TestFanoutExchange",null,m,correlationData);
return "已经成功发送TTL消息";
}
注意!!!
若将要发送到的消息队列中还有未被消费的非TTL消息,那么此时将这个单独设置的过期时间的消息发送过去,该消息不会到期消失,会成为一条永久消息(原因未知)
DLX,全称为dead-letter-exchange,也可以称之为死信交换机。当一个消息在一个队列中变为了死信消息,那么这个消息就可以被重新发送到另一个交换机中,这个绑定的交换机就被称之为死信交换机,与该交换机绑定的队列就被称之为死信队列。
- 生产者发送消息
@GetMapping("/test/message/ttl")
public String TestMessageTTL(){
MessageProperties properties = new MessageProperties();
Message m = new Message("发送一条死信消息".getBytes(),properties);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("TestFanoutExchange",null,m,correlationData);
return "已经成功发送TTL消息";
}
- 定义交换机和队列,并将死信交换机绑定到队列上,然后再讲死信交换机与死信队列相绑定
@Configuration
public class DirectRabbitConfig {
@Bean
public FanoutExchange TestFanoutExchange(){
return new FanoutExchange("TestFanoutExchange",true,false);
}
@Bean
public Queue TestQueueTTL(){
// 设置消息的过期时间为7s
Map map = new HashMap<>();
map.put("x-message-ttl",7000);
map.put("x-dead-letter-exchange","TestDLX");
map.put("x-dead-letter-routing-key","TestDLXRouting");
return QueueBuilder.durable("test_ttl_queue").withArguments(map).build();
}
@Bean
public Binding TestQueueTTLBinding(){
return BindingBuilder.bind(TestQueueTTL()).to(TestFanoutExchange());
}
@Bean
public DirectExchange TestDLX(){
return new DirectExchange("TestDLX",true,false);
}
@Bean
public Queue TestDLXQueue(){
return new Queue("TestDLXQueue",true);
}
@Bean
public Binding TestDLXBinding(){
return BindingBuilder.bind(TestDLXQueue()).to(TestDLX()).with("TestDLXRouting");
}
}
- 只定义一个消费者用来接收死信队列传过来的消息(因为对正常队列没有定义消费者,所以正常队列中的消息在到了过期时间之后就会自动转为死信消息,发往死信交换机中)
@Component
public class DirectReceiver {
@RabbitHandler
@RabbitListener(queues = "TestDLXQueue")
public void receive2(Message message, Channel channel) {
// 消息正常消费
System.out.println(message);
// 手动应答Broker
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
} catch (IOException e) {
e.printStackTrace();
}
}
}



