RabbitMQ配置
常量申明TTL交换机申明TTL队列TTL队列绑定到TTL交换机声明死信交换机声明死信队列死信队列绑定死信交换机并关联路由键配置解读运作逻辑说明
固定延迟演示动态延迟演示
在项目中,我们经常会遇到需要进行延迟的场景比如 延迟计算,延迟重试,延迟关闭订单等等,延迟的技术方案多种多样,我这里列举RabbitMq进行延迟的方案之一:TTL+死信
public static final String CHECK_POST_TTL_EXCHANGE = "check-post-ttl-exchange";
public static final String CHECK_POST = "check-post";
public static final String CHECK_POST_TTL_QUEUE = "check-post-ttl-queue";
public static final String CHECK_POST_DEAD_EXCHANGE = "check-post-dead-exchange";
public static final String CHECK_POST_DEAD_QUEUE = "check-post-dead-queue";
申明TTL交换机
实际就是一个普通的队列,我这里定义为fanout类型
@Bean
public FanoutExchange checkPostTtlExchange() {
return new FanoutExchange(CHECK_POST_TTL_EXCHANGE);
}
申明TTL队列
可定义统一的消息存活时间,死信交换机的名字,死信交换机的路由键
@Bean
public Queue checkPostTtlQueue() {
Map args = new HashMap<>(8);
// 消息存活时间 ,key 固定 value 必须为Int值,此种方式则消息存活时间固定
args.put("x-message-ttl", 10000);
//设置死信交换机,value为死信交换机的名字
args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
//设置死信 routing_key,value为死信路由键的名字
args.put("x-dead-letter-routing-key", CHECK_POST);
return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
}
TTL队列绑定到TTL交换机
@Bean
Binding ttlBind() {
return BindingBuilder.bind(checkPostTtlQueue()).to((checkPostTtlExchange()));
}
声明死信交换机
死信交换机实际也就是一个普通的交换机,我们这里需要将其申明为直连类型交换机,需要结合路由键一起使用
@Bean
DirectExchange checkPostDeadExchange() {
return new DirectExchange(CHECK_POST_DEAD_EXCHANGE, true, false);
}
声明死信队列
死信队列实际也是一个普通的队列
@Bean
Queue checkPostDeadQueue() {
return new Queue(CHECK_POST_DEAD_QUEUE, true, false, false);
}
死信队列绑定死信交换机并关联路由键
@Bean
Binding dlxBinding() {
return BindingBuilder.bind(checkPostDeadQueue()).to(checkPostDeadExchange()).with(CHECK_POST);
}
配置解读
我们定义的TTL交换机,实际就是一个普普通通的交换机,与其绑定的TTL队列额外加入了配置参数x-dead-letter-exchange 申明了死信交换机的名字,配置参数x-dead-letter-routing-key 申明了死信交换机的路由键
定义的死信交换机本质上是一个普通的直连交换机,交换机名字需与TTL队列中配置属性x-dead-letter-exchange指定的值一致,死信队列则是一个普通的队列,其使用的路由键需与TTL队列中配置属性dead-letter-routing-key指定的值一致
运作逻辑说明我们将需要延迟的消息发至TTL交换机中,TTL交换机将消息发送至绑定的TTL队列,且我们不设置消费者去监听这个TTL队列的消息,当消息在TTL队列中存活指定的时间后(上方TTL队列中x-message-ttl属性)消息将再次发送到x-dead-letter-exchange申明的死信交换机中,如果有队列(我们称之为死信队列)绑定了死信交换机且路由键为TTL队列中配置的x-dead-letter-routing-key,则会接收到消息,我们只监听对应的死信队列消息,就完成了消息的延迟
固定延迟演示固定延迟无法根据消息选择延迟时间,适合延迟时间统一的场景,重点在于TTL队列申明中,配置属性x-message-ttl
上方配置后,消息将固定延迟10s
@Bean
public Queue checkPostTtlQueue() {
Map args = new HashMap<>(8);
// 消息存活时间 ,key 固定 value 必须为Int值,此种方式则消息存活时间固定
args.put("x-message-ttl", 10000);
//设置死信交换机,value为死信交换机的名字
args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
//设置死信 routing_key,value为死信路由键的名字
args.put("x-dead-letter-routing-key", CHECK_POST);
return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
}
private void addDisableScheduleTimelyConfig(Integer configId, int expirationTime) {
final long start = System.currentTimeMillis();
int finalExpirationTime = expirationTime + 30000;
rabbitTemplate.convertAndSend(RabbitMqConfig.CHECK_POST_TTL_EXCHANGE, "", configId.toString(), message -> {
// 由于TTL队列中申明了x-message-ttl 为10000ms,下方延迟时间配置不会生效
message.getMessageProperties().setExpiration((Integer.toString(finalExpirationTime)));
message.getMessageProperties().setContentEncoding("UTF-8");
System.out.println("延迟消息发送时间:" + LocalDateTime.now());
return message;
});
}
死信队列消费者
@RabbitListener(queues = RabbitMqConfig.CHECK_POST_DEAD_QUEUE)
public void flushCheckConfigState(Message message) {
try {
System.out.println("死信队列消费者接受:" + LocalDateTime.now());
} catch (Exception exception) {
exception.printStackTrace();
log.error("处理死信消息失败:{}",new String(message.getBody()));
}
}
动态延迟演示
动态延迟即延迟消息不固定,可为每一条消息设置延迟时间
需要去除TTL队列申明中的中x-message-ttl配置属性
@Bean
public Queue checkPostTtlQueue() {
Map args = new HashMap<>(4);
//设置死信交换机
args.put("x-dead-letter-exchange", CHECK_POST_DEAD_EXCHANGE);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", CHECK_POST);
return new Queue(CHECK_POST_TTL_QUEUE, true, false, false, args);
}
发送消息指明延迟时间,我这一条消息是延迟了90000ms,即一分半
private void addDisableScheduleTimelyConfig(Integer configId, int expirationTime) {
int finalExpirationTime = expirationTime + 30000;
rabbitTemplate.convertAndSend(RabbitMqConfig.CHECK_POST_TTL_EXCHANGE, "", configId.toString(), message -> {
// 由于TTL队列中申明了x-message-ttl 为10000ms,下方延迟时间配置不会生效
message.getMessageProperties().setExpiration((Integer.toString(finalExpirationTime)));
message.getMessageProperties().setContentEncoding("UTF-8");
System.out.println("延迟消息发送时间:" + LocalDateTime.now());
return message;
});
}
发现从消息发送,到消息接收,中间时间间隔90000ms左右,说明延迟消息生效



