所谓的死信队列,也就是我们说的延迟队列。其实现方式就是给普通队列绑定一个所谓的死信队列,给消息设置一个过期时间,在该时间内如果消息没有被消费,那么则会进入死信队列。我这里以Direct模式处理,fanout模式也是类似
下面开始整活。(前提是你已经会基本使用springboot+rabbitmq)
org.springframework.boot spring-boot-starter-amqp
spring:
rabbitmq:
host: 127.0.0.1 #mq服务器ip,默认为localhost
port: 5672 #mq服务器port,默认为5672
username: guest #mq服务器username,默认为gust
password: guest #mq服务器password,默认为guest
virtual-host: /
#publisher-/confirm/is: true
#消费消息的时候,就必须手动ack确认,不然消息永远还在队列中
listener:
simple:
acknowledge-mode: manual
direct:
acknowledge-mode: manual
基本配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DeadConfig {
private String normalQueue="normal_queue";
private String normalExchange="normal_exchange";
private String normalRoutingKey="normal_routingkey";
private String deadQueue="dead_queue";
private String deadExchange="dead_exchange";
private String deadRoutingKey="dead_routingkey";
@Bean
Queue normalQueue(){
// return new Queue(normalQueue);
Map map = new HashMap<>();
map.put("x-dead-letter-exchange",deadExchange);
map.put("x-dead-letter-routing-key", deadRoutingKey);
return new Queue(normalQueue,true,false,false,map);
}
@Bean
DirectExchange normalExchange(){
return new DirectExchange(normalExchange);
}
@Bean
Binding normalBindingExchange(){
return BindingBuilder.bind( normalQueue()).to(normalExchange()).with(normalRoutingKey);
}
@Bean
Queue deadQueue(){
return new Queue(deadQueue);
}
@Bean
DirectExchange deadExchange(){
return new DirectExchange(deadExchange);
}
@Bean
Binding deadBindingExchange(){
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(deadRoutingKey);
}
}
消息生产者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@RestController
@Slf4j
public class RabbitProduct {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/123")
public void mysendDelayMessage() {
List list = Arrays.asList("123", "456", "789", "899");
log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), list.toString());
this.rabbitTemplate.convertAndSend(
"normal_exchange",
"normal_routingkey",
list,
message -> {
message.getMessageProperties().setExpiration("3000");
return message;
}
);
}
}
普通队列取消费消息的时候,死信是接收不到的,由于我们这里有对应的队列去消费,所以死信这边是无法获取到消息的
当我们把普通消息队列注释掉之后,也就是不让其消费,等时间到期之后就会转到死信队列中,通过时间差我们就能看出来效果
消费者代码
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;
@Component
@Slf4j
public class RabbitConsumer {
@RabbitListener(queues = "normal_queue")
public void normal_queue(List list, Message message, Channel channel) throws IOException {
log.info("正常队列收到消息时间为:{},收到的消息内容为:{}", LocalDateTime.now(), list.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = "dead_queue")
public void myDealy(List list, Message message, Channel channel) throws IOException {
log.info("死信收到消息时间为:{},收到的消息内容为:{}", LocalDateTime.now(), list.toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}



