1.定义rabbitmq链接
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);//确认机制
// connectionFactory.setPublisherReturns(true);
//发布确认,template要求CachingConnectionFactory的publisher/confirm/is属性设置为true
return connectionFactory;
}
2.新建queue
@Bean
public Queue queueA() {
return new Queue(QUEUE_A, true); //队列持久
}
3.新建exchange
@Bean
public DirectExchange testDirectExchange() {
return new DirectExchange(EXCHANGE_A);
}
4.绑定路由
@Bean
public Binding binding() {
return BindingBuilder.bind(queueA()).to(testDirectExchange()).with(ROUTINGKEY_A);
}
5.生产者消息发送
@PostMapping("/sendA")
public String sendMessage() {
try {
Map messageMap = new HashMap<>();
messageMap.put("messageId",UUID.randomUUID().toString());
messageMap.put("messageData","测试信息");
messageMap.put("createTime", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
queueMessageService.send(messageMap, RabbitMqConfig.EXCHANGE_A, RabbitMqConfig.ROUTINGKEY_A);
return "success";
} catch (Exception e) {
log.error(""+e);
return "error";
}
}
@Override
public void send(Object message, String exchange, String queueRoutingKey) throws Exception {
//构建回调id为uuid
String callBackId = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(callBackId);
log.info("开始发送消息内容:{}",message.toString());
//发送消息到消息队列
rabbitTemplate.convertAndSend(exchange, queueRoutingKey, message, correlationId);
log.info("发送定制的回调id:{}",callBackId);
}
6.消费者读取队列
@RabbitListener(queues = RabbitMqConfig.QUEUE_A)
@RabbitHandler
public void consumeMessage(Message message){
log.info("收到的消息:{}",message);
}



