一、TTL介绍二、原始API三、springboot案例
3.1 pom.xml添加依赖3.2.application.yml添加rabbitmq连接信息3.3.主启动类3.4 RabbitConfig类3.5 测试类
3.5.1 整个队列的消息设置过期时间3.5.2局部设置消息过期
在京东下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内用户没有支付,则默认订单取消。
TTL,Time to Live 的简称,即过期时间。
RabbitMQ 可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。
目前有两种方法可以设置消息的TTL。
- 通过Queue属性设置,队列中所有消息都有相同的过期时间。对消息自身进行单独设置,每条消息的TTL 可以不同。
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”也是可以被取出来消费
二、原始API try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 创建队列(实际上使用的是AMQP default这个direct类型的交换器)
// 设置队列属性
Map arguments = new HashMap();
// 设置队列的TTL
arguments.put("x-message-ttl", 30000);
// 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
arguments.put("x-expires", 10000);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
for (int i = 0; i < 1000000; i++) {
String message = "Hello World!" + i;
channel.basicPublish("", QUEUE_NAME, new
AMQP.BasicProperties().builder().expiration("30000").build(), message.getBytes()
);
System.out.println(" [X] Sent '" + message + "'");
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
注意理解 x-message-ttl、 x-expires这两个参数的区别,有不同的含义。但是这两个参数属性都遵循上面的默认规则。一般TTL相关的参数单位都是毫秒(ms)
三、springboot案例 3.1 pom.xml添加依赖3.2.application.yml添加rabbitmq连接信息org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test test
spring:
application:
name: springboot_rabbitmq_ttl
rabbitmq:
host: 139.9.132.132
virtual-host: /
username: admin
password: 123
port: 5672
3.3.主启动类
@SpringBootApplication
public class RabbitmqttlApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqttlApplication.class, args);
}
}
3.4 RabbitConfig类
@Configuration
public class RabbitConfig {
@Bean
public Queue queueTTLWaiting() {
Map props = new HashMap<>();
//对于该队列中的消息,设置都等待10s
props.put("x-message-ttl", 10000);
Queue queue = new Queue("q.pay.ttl-waiting", false, false, false, props);
return queue;
}
@Bean
public Exchange exchangeTTLWaiting() {
DirectExchange exchange = new DirectExchange("ex.pay.ttl-waiting", false, false);
return exchange;
}
@Bean
public Binding bindingTTLWaiting() {
return BindingBuilder.bind(queueTTLWaiting()).to(exchangeTTLWaiting()).with("pay.ttl-waiting").noargs();
}
@Bean
public Queue queueWaiting() {
Queue queue = new Queue("q.pay.waiting", false, false,
false);
return queue;
}
@Bean
public Exchange exchangeWaiting() {
DirectExchange exchange = new DirectExchange("ex.pay.waiting", false, false);
return exchange;
}
@Bean
public Binding bindingWaiting() {
return BindingBuilder.bind(queueWaiting()).to(exchangeWaiting()).with("pay.waiting").noargs();
}
}
3.5 测试类
3.5.1 整个队列的消息设置过期时间
@Autowired
private AmqpTemplate rabbitTemplate;
@Test
void sendMessage() {
rabbitTemplate.convertAndSend(
"ex.pay.ttl-waiting",
"pay.ttl-waiting",
"发送了TTL-WAITING-MESSAGE");
System.out.println("queue-ttl-ok = ");
}
结果
发送消息时
10s后
@Test
void sendTTLMessage() throws UnsupportedEncodingException {
MessageProperties properties = new MessageProperties();
properties.setExpiration("5000");
Message message = new Message("发送了WAITING-MESSAGE" .getBytes("utf-8"), properties);
rabbitTemplate.convertAndSend("ex.pay.waiting", "pay.waiting", message);
System.out.println("msg-ttl-ok = ");
}
结果
发送消息时
5s后



