相关常量:
public static String DELAY_QUEUE="delayqueue"; public static String IMMEDIATE_QUEUE="immediateqqueue"; public static String IMMEDIATE_EXCHANGE="immediatequeue"; public static String IMMEDIATE_ROUTING_KEY="immediaterouring"; public static String DELAY_ROUTING_KEY="delayrouting"; public static String DEAD_LETTER_EXCHANGE="dead_letter_exchange";
rabbitmq配置类
package com.csrcb.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class RabbitMqConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
public static final String EXCHANGE_A = "my-mq-direct_exchange";
public static final String EXCHANGE_B = "my-mq-exchange_B";
public static final String EXCHANGE_C = "my-mq-exchange_C";
public static final String QUEUE_A = "QUEUE_A";
public static final String QUEUE_A_FAIL = "QUEUE_A_FAIL";
public static final String QUEUE_B = "QUEUE_B";
public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
public static final String ROUTINGKEY_A_FAIL = "spring-boot-routingKey_A_FAIL";
public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
//建立一个连接容器,类型数据库的连接池
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);//确认机制
// connectionFactory.setPublisherReturns(true);
//发布确认,template要求CachingConnectionFactory的publisher/confirm/is属性设置为true
return connectionFactory;
}
//RabbitMQ的使用入口
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
template.setMessageConverter(this.jsonMessageConverter());
template.setMandatory(true);
return template;
}
//把交换机,队列,通过路由关键字进行绑定,写在RabbitConfig类当中
@Bean
public DirectExchange testDirectExchange() {
return new DirectExchange(EXCHANGE_A);
}
@Bean
public DirectExchange testDirectExchangeB() {
return new DirectExchange(EXCHANGE_B);
}
@Bean
public DirectExchange testDirectExchangeC() {
return new DirectExchange(QUEUE_B);
}
@Bean
public Queue queueA() {
return new Queue(QUEUE_A, true); //队列持久
}
@Bean
public Queue queueB() {
return new Queue(QUEUE_B, true); //队列持久
}
@Bean
public Binding bindingB() {
return BindingBuilder.bind(queueB()).to(testDirectExchangeB()).with(ROUTINGKEY_B);
}
@Bean
public Queue queueAFail(){
return new Queue(QUEUE_A_FAIL, true);//队列持久
}
//将队列和交换机绑定, 并设置用于匹配键:spring-boot-routingKey_A
@Bean
public Binding binding() {
return BindingBuilder.bind(queueA()).to(testDirectExchange()).with(ROUTINGKEY_A);
}
@Bean
public Binding bindingAFail() {
return BindingBuilder.bind(queueAFail()).to(testDirectExchange()).with(RabbitMqConfig.ROUTINGKEY_A_FAIL);
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
//一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
//
// @Bean
// public Queue queueB() {
// return new Queue(QUEUE_B, true); //队列持久
// }
// @Bean
// public Binding bindingB(){
// return BindingBuilder.bind(queueB()).to(defaultExchange()).with(ROUTINGKEY_B);
// }
// @Bean
// public CharacterEncodingFilter characterEncodingFilter() {
// CharacterEncodingFilter filter = new CharacterEncodingFilter();
// filter.setEncoding("UTF-8");
// filter.setForceEncoding(true);
// return filter;
// }
}
package com.csrcb.config;
import java.util.HashMap;
import java.util.Map;
import com.csrcb.common.Constants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class delayqueueConfig {
// 创建一个立即消费队列
@Bean
public Queue immediateQueue() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue(Constants.IMMEDIATE_QUEUE, true);
}
// 创建一个延时队列
@Bean
public Queue delayQueue() {
Map params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", Constants.IMMEDIATE_EXCHANGE);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", Constants.IMMEDIATE_ROUTING_KEY);
return new Queue(Constants.DELAY_QUEUE, true, false, false, params);
}
@Bean
public DirectExchange immediateExchange() {
// 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange(Constants.IMMEDIATE_EXCHANGE, true, false);
}
@Bean
public DirectExchange deadLetterExchange() {
// 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
//第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
return new DirectExchange(Constants.DEAD_LETTER_EXCHANGE, true, false);
}
@Bean
//把立即消费的队列和立即消费的exchange绑定在一起
public Binding immediateBinding() {
return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(Constants.IMMEDIATE_ROUTING_KEY);
}
@Bean
//把立即消费的队列和立即消费的exchange绑定在一起
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(Constants.DELAY_ROUTING_KEY);
}
}
生产者controller
@PostMapping("/senddeadqueue")
public void sendMessagefanout11111() {
this.queueMessageService.senddelayqueue("fa song si xin dui lie",20000l);
}
public void senddelayqueue(String user,Long time) {
log.info("消息已经发送,时间为:{}",new Timestamp(System.currentTimeMillis()));
String callBackId = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(callBackId);
this.rabbitTemplate.convertAndSend(
Constants.DEAD_LETTER_EXCHANGE,
// routingKey
Constants.DELAY_ROUTING_KEY,
user,
message -> {
// 设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(time));
return message;
},correlationId);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
log.info(" 回调id:" + correlationData.getId());
if (ack) {
log.info("消息发送成功");
} else {
log.info("消息发送失败:" + s);
}
}
消费者消费
@RabbitListener(queues = "immediateqqueue")
@RabbitHandler
public void consumeMessagedelaymessage(Message message){
log.info("收到的消息fouantC:{}",message);
}
pom.xml
org.springframework.boot spring-boot-starter-amqp



