import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MQConfig {
private static final String EXCHANGE = "test_exchange";
private String DELAY_QUEUE = "delay_queue";
private String DELAY_QUEUE_ROUTING_KEY = "delay.queue.routing.key";
private String DEAD_LETTER_QUEUE = "dead_letter_queue";
private String DEAD_LETTER_QUEUE_ROUTING_KEY = "dead.letter.queue.routing.key";
@Bean
Exchange exchange() {
return new TopicExchange(EXCHANGE);
}
@Bean
Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE_ROUTING_KEY)
.withArgument("x-dead-letter-exchange", EXCHANGE)
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_ROUTING_KEY)
.build();
}
@Bean
Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
Binding delayQueueBinding() {
return BindingBuilder.bind(delayQueue())
.to(exchange())
.with(DELAY_QUEUE_ROUTING_KEY)
.noargs();
}
@Bean
Binding deadLetterQueueBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(exchange())
.with(DEAD_LETTER_QUEUE_ROUTING_KEY)
.noargs();
}
}
(2)发送者
public class RabbitMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchange, String routingKey, String message, String delayMilli) {
MessagePostProcessor postProcessor = (msg) -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
msg.getMessageProperties().setExpiration(delayMilli);
return msg;
};
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
}
}
2.插件实现
设置了过期时间的消息,并不一定在过期后里面失效,而是等处理到该消息后,判断其失效时才会进入死信队列,所以需要安装插件来支持消息及时过期。
https://www.rabbitmq.com/community-plugins.html
安装 rabbitmq_delayed_message_exchange 插件后即可实现。
(1)下载插件进入 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/plugins 目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.0/plugins wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
然后再用
rabbitmq-plugins list
就能看到刚才下载的插件了
(2)启用插件[root@mq-test plugins]# rabbitmq-plugins enable rabbitmq_delayed_message_exchange Enabling plugins on node rabbit@mq-test: rabbitmq_delayed_message_exchange The following plugins have been configured: rabbitmq_delayed_message_exchange rabbitmq_management rabbitmq_management_agent rabbitmq_tracing rabbitmq_web_dispatch Applying plugin configuration to rabbit@mq-test... The following plugins have been enabled: rabbitmq_delayed_message_exchange(3)重启 RabbitMQ
rabbitmqctl stop rabbitmq-server restart(4)配置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MqConfiguration {
@Bean
CustomExchange delayExchange() {
Map args = new HashMap<>(1);
args.put("x-delayed-type", "direct");
return new CustomExchange("delay_exchange", "x-delayed-message", true, false, args);
}
@Bean
Queue delayQueue() {
return new Queue("delay_queue");
}
@Bean
Binding delayBinding() {
return BindingBuilder.bind(delayQueue())
.to(delayExchange())
.with("delay.queue.routing.key")
.noargs();
}
}
(5)发送者
public class RabbitMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchange, String routingKey, String message, int delayMilli) {
MessagePostProcessor postProcessor = (msg) -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
msg.getMessageProperties().setDelay(delayMilli);
return msg;
};
rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);
}
}



