地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/tags
下载的插件放到rabbitmq安装目录的plugins里面,重启rabbitmq就可以了。
声明交换机和队列import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayedQueueConfig {
//交换机
private static final String DELAYED_EXCHANGE = "delayed_exchange";
//队列
private static final String DELAYED_QUEUE="delayed_queue";
//routinKey
private static final String ROUTINKEY="routinkey";
@Bean("delayedExchange")
CustomExchange delayedExchange(){
Map map = new HashMap<>();
map.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,map);
}
@Bean("delayedQueue")
Queue delayedQueue(){
return new Queue(DELAYED_QUEUE);
}
@Bean
Binding queueBindingExcehang(@Qualifier("delayedExchange") CustomExchange delayedExchange,@Qualifier("delayedQueue") Queue delayedQueue){
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(ROUTINKEY).noargs();
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("/ttl")
public class sendMessgeController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg3/{msg}/{now}")
public void sendMesg3(@PathVariable String msg,@PathVariable int now){
Date date = new Date();
log.info("当前时间:{},发送一条消息给两个ttl队列:{}",date ,msg);
rabbitTemplate.convertAndSend("delayed_exchange","routinkey",msg,ms->{
ms.getMessageProperties().setDelay(now);
return ms;
});
}
}
消费者
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class QueueConsumer {
@RabbitListener(queues = "delayed_queue")
void receiveDelayed(Message msg){
String str = new String(msg.getBody());
log.info("收到延迟队列消息,当前时间{},消息为{}", new Date(),str);
}
}
最后看效果
请求两次看看最终效果
http://127.0.0.1:8080/ttl/sendMsg3/one/20000 20秒
http://127.0.0.1:8080/ttl/sendMsg3/two/2000 2秒



