延迟队列,顾名思义就是消息进入消息队列后不会被立即消费,而是等一段时间后消费,这个时间可以自己设定,比如用户登录5分钟后向其推送消息。
rabbitmq并未提供延时队列的直接实现,但是rabbitmq提供了死信队列和TTL(消息过期时间)的实现方式,我们可以借助死信队列+TTL到达实现延迟队列的目的。下面先实现死信队列和TTL。
实现TTL这里使用springboot实现TTL,通过ttl方法设置队列过期时间,配置类:
@Configuration
public class RabbitmqConfig {
public static final String EXCHANGE = "springboot-exchange";
public static final String QUEUE = "springboot-queue";
@Bean
public Exchange topicExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
}
@Bean
public Queue itemQueue() {
//过期时间设置为10000毫秒
return QueueBuilder.durable(QUEUE).ttl(10000).build();
}
@Bean
public Binding itemQueueExchange(@Qualifier("itemQueue") Queue queue, @Qualifier("topicExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("item.#").noargs();
}
}
也可以单独设置消息的过期时间,最终以时间短的为准。
@Test
void test01() {
MessagePostProcessor postProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//设置5秒后过期,队列是10秒后过期,消息过期时间比队列过期时间短,所以消息最终是5秒后过期
message.getMessageProperties().setExpiration("5000");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE, "item.insert", "商品新增, routing key为item.insert");
}
实现死信队列
当消息成为死信后,可以被重新发送到另一个交换机,与该交换机绑定的消息队列可以暂时存放该死信消息。一个消息成为死信可能是由于 队列消息长度到达限制, 消费者拒接消费消息,并且没把消息重新放入原目标队列,或者原队列消息到达超时时间还未被消费。
队列绑定死信交换机,只需要给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key,这里使用springboot实现死信队列。
配置类:
@Configuration
public class RabbitmqConfig {
public static String DEAD_EXCHANGE = "dead-exchange";
public static String DEAD_QUEUE = "dead-queue";
public static String NORMAL_EXCHANGE = "normal-exchange";
public static String NORMAL_QUEUE = "normal_queue";
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.directExchange(DEAD_EXCHANGE).durable(true).build();
}
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE).build();
}
@Bean
public Binding deadExchangeQueue(@Qualifier("deadQueue") Queue queue, @Qualifier("deadExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("dead").noargs();
}
@Bean
public Exchange normalExchange() {
return ExchangeBuilder.directExchange(NORMAL_EXCHANGE).durable(true).build();
}
@Bean
public Queue normalQueue() {
// 设置过期时间和队列长度,并且设置 x-dead-letter-exchange 和 x-dead-letter-routing-key参数
Map map = new HashMap();
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key", "dead");
return QueueBuilder.durable(NORMAL_QUEUE).ttl(10000).maxLength(10).withArguments(map).build();
}
@Bean
public Binding normalExchangeQueue(@Qualifier("normalQueue") Queue queue, @Qualifier("normalExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("normal").noargs();
}
}
测试:
@Test
public void testDlx() {
//1. 测试消息拒收,死信消息,需要在消费端拒收
//rabbitTemplate.convertAndSend(RabbitmqConfig.NORMAL_EXCHANGE,"normal","我是一条消息,我会死吗?");
//2. 测试长度限制后,消息死信
// for (int i = 0; i < 20; i++) {
// rabbitTemplate.convertAndSend(RabbitmqConfig.NORMAL_EXCHANGE, "normal", "我是一条消息,我会死吗?");
// }
//3. 测试过期时间,死信消息
rabbitTemplate.convertAndSend(RabbitmqConfig.NORMAL_EXCHANGE,"normal","我是一条消息,我会死吗?");
}
实现延迟队列
利用TTL+死信队列就可以轻松实现延迟队列了,正常队列设置过期时间,过期后放到死信队列,监听该死信队列的消费者便可消费。
上面配置好的死信队列就可以用来做延迟队列,使用测试中的第三个,即:
//3. 测试过期时间,死信消息
rabbitTemplate.convertAndSend(RabbitmqConfig.NORMAL_EXCHANGE,"normal","我是一条消息,我会死吗?");
该消息过期后,便会转到死信队列,消费端监听该死信队列:
@Component
public class MyListener implements ChannelAwareMessageListener {
@Override
@RabbitListener(queues = "dead-queue")
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("message: " + message);
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
e.printStackTrace();
channel.basicNack(deliveryTag, true, true);
}
Thread.sleep(1000);
}
}



