死信,就是由于某些原因无法被正常消费的消息死信队列,就是用来存储死信的队列。死信交换机,实际上就是一个普通的direct类型交换机应用场景:
因为没有被正常消费的消息被路由到了死信队列,这保证了消息数据不丢失,相当于在持久化、发布-确认的基础上又加了一层保护。订单创建后在指定时间未支付时自动失效(要结合延迟队列来实现)
死信的来源
消息 TTL 过期:TTL是Time To Live的缩写, 也就是生存时间队列达到最大长度:队列满了,无法再添加数据到 mq 中消息被拒绝:(basic.reject 或 basic.nack) 并且 requeue=false 生产者核心代码(原生代码)
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//设置消息的 TTL 时间 10s
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
//该信息是用作演示队列个数限制
for (int i = 1; i < 11; i++) {
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes());
System.out.println("生产者发送消息:" + message);
}
正常消费者核心代码
//正常队列绑定死信队列信息 Map死信消费者核心代码params = new HashMap<>(); //正常队列设置死信交换机 参数 key 是固定值 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); //正常队列设置死信 routing-key 参数 key 是固定值 params.put("x-dead-letter-routing-key", "lisi"); //设置正常队列的长度限制,例如发10个,4个则为死信 params.put("x-max-length", 6); //正常队列 channel.queueDeclare(normalQueue, false, false, false, params); channel.queueBind(normalQueue, NORMAL_EXCHANGE, "zhangsan"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); if (message.equals("info5")) { System.out.println("Consumer 接收到消息" + message + "并拒绝签收该消息"); //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer 接收到消息" + message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; //拒绝消息的前提是开启手动应答 channel.basicConsume(normalQueue, false, deliverCallback, consumerTag -> {});
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Dead Consumer 接收到消息" + message);
};
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {});
延迟队列
概念
就是用来存放需要在指定时间被处理的元素的队列使用场景:
订单在十分钟之内未支付则自动取消新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天内没有登陆则进行短信提醒用户发起退款,如果三天内没有得到处理则通知相关运营人员预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 基于死信的延迟队列
该方式是通过消息在延迟队列中TTL到期后,路由到死信队列来实现的消息延迟处理
该方式需要为不同的TTL时间的消息创建不同的延迟队列,比较麻烦
架构图(创建两个TTL时间不同的延迟队列QA、QB,消息到期后路由到同一个死信队列QD):
配置类核心代码(SpringBoot方式)
//声明队列A 并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA() {
return QueueBuilder.durable(QUEUE_A).deadLetterExchange(Y_DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("YD")
.ttl(10000).build();//设置队列中消息的ttl
}
// 队列A 绑定X交换机
@Bean
public Binding queueaBindingX(Queue queueA, DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
生产者核心代码:
@GetMapping("sendMsg/{message}")
public void sendMsg(@PathVariable String message) {
rabbitTemplate.convertAndSend("X", "XA", "消息来自 ttl 为 10S 的队列: " + message);
}
消费者核心代码:
@RabbitListener(queues = "QD")
public void receiveD(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date(), msg);
}
基于插件的延迟队列(推荐)
MQ默认无法实现同一个队列延迟不同TTL的消息
该方式通过自定义交换机,实现了一个通用的延迟队列
下载安装步骤:
rabbitmq_delayed_message_exchange 插件下载地址(注意版本兼容性)放置到 RabbitMQ 的插件目录sbin目录下执行命令:rabbitmq-plugins enable rabbitmq_delayed_message_exchange重启MQ
架构图(只需要一个普通队列,关键在于这个交换机):
配置类核心代码:
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map args = new HashMap<>();
args.put("x-delayed-type", "direct");//自定义交换机的类型
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingDelayedQueue(Queue queue, CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
生产者核心代码:
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY,
message,
correlationData -> {
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
消费者核心代码:
@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveDelayedQueue(Message message) {
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date(), msg);
}
完整代码
完整代码:GitHub



