死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?
“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:
-
消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。
-
消息在队列的存活时间超过设置的TTL时间。
-
消息队列的消息数量已经超过最大队列长度。
那么该消息将成为“死信”。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
代码实战 1、准备阶段这里我准备了一个SpringBoot基础环境代码,我会在此基础上进行集成,代码我已上传,地址如下:
SpringBootbase: SpringBoot基础项目框架
基础薄弱的同学,可以下载下来,跟我下面的步骤一步一步走,就可以出来效果了,再集成到你自己的项目中
代码下载完之后的目录结构是这样的:
2、添加相关依赖jar包在这个代码基础上,我们需要先引入RabbitMQ的依赖,在pom.xml中加入下面依赖:
3、yml文件添加相关配置org.springframework.boot spring-boot-starter-amqp
我们再application.yml文件中新增下面代码:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 发送确认
publisher-/confirm/is: true
# 路由失败回调
publisher-returns: true
template:
# 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
mandatory: true
listener:
simple:
# 每次从RabbitMQ获取的消息数量
prefetch: 1
default-requeue-rejected: false
# 每个队列启动的消费者数量
concurrency: 1
# 每个队列最大的消费者数量
max-concurrency: 1
# 签收模式为手动签收-那么需要在代码中手动ACK
acknowledge-mode: manual
还需要添加队列交换名称等信息,代码如下:
#死信消息模型 dead: exchange-dead: order.dead.exchange routing-dead-key: order.dead.routingKey queue-dead: order.dead.queue normal-queue: order.normal.queue normal-exchange: order.normal.exchange normal-routing-key: order.normal.routingKey #设置过期时间(单位:毫秒) expire: 3000
最后结构如下图:
各位一定要注意缩进哦,不然到时候有问题的
4、RabbitMQ配置类在config包下面新建RabbitmqConfig配置类,代码如下:
package org.wujiangbo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.wujiangbo.entity.RabbitMQProperties;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class RabbitmqConfig {
@Autowired
private RabbitMQProperties rabbitMQProperties;
//创建普通队列
@Bean
public Queue normalQueue() {
Map arguments = new HashMap<>(2);
// 设置死信参数
// 绑定死信交换机
arguments.put("x-dead-letter-exchange", rabbitMQProperties.getExchangeDead());
// 绑定死信的路由key
arguments.put("x-dead-letter-routing-key", rabbitMQProperties.getRoutingDeadKey());
return new Queue(
rabbitMQProperties.getNormalQueue(),
true,
false,
false,
arguments);
}
//创建普通交换机
@Bean
public Exchange normalExchange(){
return new TopicExchange(rabbitMQProperties.getNormalExchange(),true,false);
}
//绑定普通队列和普通交换机
@Bean
public Binding normalBind(){
return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(rabbitMQProperties.getNormalRoutingKey()).noargs();
}
//创建死信交换机
@Bean
public Exchange deadExchange(){
return new TopicExchange(rabbitMQProperties.getExchangeDead(),true,false);
}
//创建死信队列
@Bean
public Queue deadQueue(){
return new Queue(rabbitMQProperties.getQueueDead(),true,false,false);
}
//绑定死信队列与死信交换机
@Bean
public Binding realBindDead(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(rabbitMQProperties.getRoutingDeadKey()).noargs();
}
}
里面需要依赖两个实体类,代码如下:
package org.wujiangbo.entity;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
@ToString
@Data
public class OrderDto implements Serializable {
private Long id;//订单编号
private Long userId;//订单归属人ID
private Double orderMoney;//订单金额
private String orderStatus;//订单状态(0:待支付;1:已支付;2:未支付(超时))
}
还有一个RabbitMQProperties类,代码如下:
package org.wujiangbo.entity;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "dead")
@Data
public class RabbitMQProperties {
private String exchangeDead;//死信队列交换机
private String routingDeadKey;//死信队列路由键
private String queueDead;//死信队列名称
private String normalQueue;//正常队列名称
private String normalExchange;//正常交换机名称
private String normalRoutingKey;//正常路由键
private String expire;//过期时间
}
此时项目结果如下:
5、生产者代码
生产者ProducerTest类代码如下:
package org.colin.mq;
import lombok.extern.slf4j.Slf4j;
import org.colin.entity.OrderDto;
import org.colin.entity.RabbitMQProperties;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
@EnableScheduling
@Slf4j
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitMQProperties rabbitMQProperties;
//模拟下单操作
@Scheduled(fixedDelay = 10 * 1000)//每隔X秒运行一次
public void addOrder() {
log.info("-------------------------------------开始下单啦-------------------------------------");
//测试数据
OrderDto dto = new OrderDto();
dto.setId(7L);
dto.setOrderMoney(900D);
dto.setUserId(14L);
dto.setOrderStatus("0");//订单状态(0:待支付;1:已支付;2:未支付(超时))
try {
//将订单消息发到MQ服务器中
rabbitTemplate.convertAndSend(
rabbitMQProperties.getNormalExchange(),
rabbitMQProperties.getNormalRoutingKey(),
dto, new MessagePostProcessor() {
//在消息发送之前应用于消息的处理器
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
//设置过期时间TTL
messageProperties.setExpiration(String.valueOf(rabbitMQProperties.getExpire()));
//设置持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
} catch (Exception e) {
log.error("下单失败:{}", e);
}
}
}
该生产者代码我是用一个定时任务做的,定时每10秒钟发一次消息,模拟下单操作
6、消费者代码package org.wujiangbo.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import org.wujiangbo.entity.OrderDto;
import java.io.IOException;
import java.util.Map;
@Component
@Slf4j
public class ConsumerTest {
@RabbitListener(queues = {"${dead.queue-dead}"})
public void handleDeadMessage(OrderDto dto, Message message, Channel channel, @Headers Map headers) throws IOException {
log.info("接收到死信队列的消息:{}", dto.toString());
//回复ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
注意:
我的消费者类中,没有写方法去监听正常的队列,而是只写了一个方法监听死信队列,这样做的目的,就是为了模拟用户下单一直不支付的场景,那么正常队列中的消息就会一直不会被消费而长期存留在队列中,而我在yml中设置的超时时间是3秒,也就是说,订单消息在队列中3秒内没有消费者消费的话,那就会被路由到死信交换机中,从而进入死信队列中,那么我这里监听着死信队列,就可以拿到消息了,此时拿到的所有消息都是过期未支付的订单,就可以做业务处理了,比如修改数据库订单状态为未支付,然后将库存加回数据库,让其他用户也可以下单购买
7、测试启动项目,观察控制台:
从控制台可以看出,每次发消息3秒之后,监听着死信队列的消费者就可以拿到消息了, 因为消息在正常队列中存在3秒之后就到了过期时间,就会被自动路由到死信交换机中,从而抵达死信队列,那么就会被监听到消费掉
至此,测试成功了
完整代码地址:SpringBootRabbitMQ001: RabbitMQ死信队列测试
结束语这样的死信队列在很多实际业务场景都可以得到很好的应用,希望大家学到后,能灵活运用
最后总结了一张图:
-
大家如果还有任何疑问,可以留言,我会第一时间回复的
-
最后别忘点赞哦,非常感谢大家的支持与厚爱,我会不断分享更多干货给大家的哈



