引入依赖
org.springframework.boot spring-boot-starter-amqp 2.2.10.RELEASE
配置文件
spring:
rabbitmq:
host: 192.168.137.128
port: 5672
username: root
password: root
死信队列
没有被及时消费的消息存放的队列,消息没有被消费主要有以下原因:
a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
b.TTL(time-to-live) 消息超时未消费
c.达到最大队列长度
消息变成死信后,会被重新投递(publish)到另一个交换机上(Exchange),这个交换机往往被称为DLX(dead-letter-exchange)“死信交换机”,然后交换机根据绑定规则转发到对应的队列上,监听该队列就可以被重新消费。
生产者–>发送消息–>交换机–>队列–>变成死信队列–>DLX交换机–>队列–>监听–>消费者
在声明普通queue时加入props
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key", "222");
要收到死信交换机的消息,要以此时指定的"x-dead-letter-routing-key"去指定死信队列的路由键
TTL(time-to-live) 消息超时未消费
更改生产者参数
设置生产者发送消息的TTL
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
channel.basicPublish(consumer01.NORMAL_EXCHANGE,"222",properties,("message"+i).getBytes(StandardCharsets.UTF_8));
}
达到最大队列长度
params.put("x-max-length",3); 设置普通队列的长度,当队列中充满积压消息时,消息就会被转到死信队列
消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
手动应答消息,并在拒绝应答消息将重新放回queue设为false channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
延时队列,用于存放需要在指定时间内被处理的元素
可以实现定时任务的功能
例如:
实现延时队列
创建如上图所示的队列和交换机
@Configuration
public class RabbitConfig {
public static final String DIRECT_EXCHANGE = "X";
public static final String QA_QUEUE = "QA";
public static final String QB_QUEUE = "QB";
public static final String DEAD_EXCHANGE = "Y";
public static final String DEAD_QUEUE = "QD";
@Bean
public DirectExchange direct_exchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public DirectExchange dead_exchange(){
return new DirectExchange(DEAD_EXCHANGE);
}
//自定义过期时间的消息发送
@Bean
public Queue ttl_queue(){
HashMap map = new HashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable("ttl_queue").withArguments(map).build();
}
@Bean
public Queue dead_queue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Queue qa_queue(){
return QueueBuilder.durable(QA_QUEUE)
.ttl(10000)
.deadLetterExchange(DEAD_EXCHANGE)
.deadLetterRoutingKey("YD")
.build();
}
@Bean
public Queue qb_queue(){
return QueueBuilder.durable(QB_QUEUE)
.ttl(40000)
.deadLetterRoutingKey("YD")
.deadLetterExchange(DEAD_EXCHANGE)
.build();
}
@Bean
public Binding queueBindingA(@Qualifier("direct_exchange")DirectExchange directExchange, @Qualifier("qa_queue")Queue a, @Qualifier("qb_queue")Queue b){
return BindingBuilder.bind(a).to(directExchange).with("XA");
}
@Bean
public Binding queueBindingB(@Qualifier("direct_exchange")DirectExchange directExchange, @Qualifier("qb_queue")Queue b){
return BindingBuilder.bind(b).to(directExchange).with("XB");
}
@Bean
public Binding queueBindingTTL(@Qualifier("direct_exchange")DirectExchange directExchange, @Qualifier("ttl_queue")Queue b){
return BindingBuilder.bind(b).to(directExchange).with("XTTL");
}
@Bean
public Binding queueBindingDead(@Qualifier("dead_exchange")DirectExchange directExchange,@Qualifier("dead_queue")Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("YD");
}
}
用于消费延迟消息的死信队列
@Component
public class consumer {
@RabbitListener(queues = "QD")
public void consumeDead(Message message, Channel channel){;
String msg = message.getBody().toString();
System.out.println("消费来自死信队列的消息"+msg+" "+new Date().toString());
}
}
自定义时间的延迟队列
也可以采用QC,实现一个自定义延迟时长的延迟队列,但是会有一个问题
RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。这就导致第二个消息设置的过期时间没有意义了
安装延时队列插件来解决这个问题:
插件安装地址
将插件放到rabbitmq的plugins目录下,例如:/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.8/plugins
开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange插件实现:通过交换机延迟指定时间
实现代码:
@Bean public Queue delayedQueue() { return QueueBuilder.durable(DELAYED_QUEUE_NAME).build(); } //自定义交换机 我们在这里定义的是一个延迟交换机 @Bean public CustomExchange delayedExchange() { Mapargs = new HashMap<>(); //自定义交换机的类型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); }



