1、1 简介 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。 1、2 消息成为死信的三种情况: 队列消息长度到达限制; 消费者拒接消费消息,并且不重回队列; 原队列存在消息过期设置,消息到达超时时间未被消费;
2、1 代码实现
// DLX
// 先定义正常的队列和交换机
public static final String TEST_DLX_EXCHANGE_NAME = "test_dlx_exchange";
public static final String TEST_DLX_QUEUE_NAME = "test_dlx_queue";
// 定义死信队列
public static final String DLX_EXCHANGE_NAME = "dlx_exchange";
public static final String DLX_QUEUE_NAME = "dlx_queue";
@Bean("SpringBootDLXExchange")
public Exchange DLXExchange(){
return ExchangeBuilder.topicExchange(DLX_EXCHANGE_NAME).durable(true).build();
}
@Bean("SpringBootDLXQueue")
public Queue DLXQueue() {
return QueueBuilder.durable(DLX_QUEUE_NAME).build();
}
@Bean
public Binding bindDLXQueueTOExchange(@Qualifier("SpringBootDLXQueue") Queue queue, @Qualifier("SpringBootDLXExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("dlx.#").noargs();
}
@Bean("SpringBootDLXTestExchange")
public Exchange DLXTestExchange(){
return ExchangeBuilder.topicExchange(TEST_DLX_EXCHANGE_NAME).durable(true).build();
}
@Bean("SpringBootDLXTestQueue")
public Queue DLXTestQueue() {
Map map = new HashMap<>();
// 设置队列的过期时间
map.put("x-message-ttl",10000);
// 设置队列的长度限制
map.put("x-max-length",10);
// x-dead-letter-exchange:死信交换机名称
map.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);
// x-dead-letter-routing-key:发送给死信交换机的routingkey
map.put("x-dead-letter-routing-key","dlx.hello");
return QueueBuilder.durable(TEST_DLX_QUEUE_NAME).withArguments(map).build();
}
@Bean
public Binding bindDLXTestQueueTOExchange(@Qualifier("SpringBootDLXTestQueue") Queue queue, @Qualifier("SpringBootDLXTestExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("test.dlx.#").noargs();
}
2、2 消费者代码:
@Component
public class DLXListener {
@RabbitListener(queues = "test_dlx_queue")
public void TopicQueueListener(Message msg,Channel channel) throws Exception {
long deliveryTag = msg.getMessageProperties().getDeliveryTag();
try {
Thread.sleep(2000);
// 1、接受转换的消息
System.out.println(new String(msg.getBody()));
// 2、业务逻辑处理
// 手动异常
int i = 3 / 0;
System.out.println("业务逻辑处理中-------------");
// 3、手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
// 4、接受失败策略
channel.basicNack(deliveryTag,true,false);
}
}
}
2、3 测试:
@SpringBootTest
@RunWith(SpringRunner.class)
public class DLXProducerTest {
// 1、注入RabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDlx(){
// 1. 测试过期时间,死信消息
// rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.hello","我是一条消息,我会死吗?");
// 2. 测试长度限制后,消息死信
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.hello","我是一条消息,我会死吗?");
// }
// 3. 测试消息拒收
rabbitTemplate.convertAndSend("test_dlx_exchange","test.dlx.haha","我是一条消息,我会死吗?");
}
}
效果如下:
1、测试过期时间
2、测试长度限制后,消息死信
3、测试消息拒收
项目代码链接:https://github.com/Mbm7280/rabbitmq_demo



