- application.properties
spring.rabbitmq.host=192.168.3.202
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-/confirm/i-type=correlated
spring.rabbitmq.publisher-returns=true
- My/confirm/iConfig
package cn.cnyasin.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyConfirmConfig {
// 交换机
public static final String EXCHANGE_ConFIRM = "exchange_/confirm/i";
// 队列
public static final String QUEUE_ConFIRM = "queue_/confirm/i";
// 路由key
public static final String ROUTING_ConFIRM = "routing_/confirm/i";
// 声明交换机
@Bean
public DirectExchange exchangeConfirm() {
return ExchangeBuilder.directExchange(EXCHANGE_/confirm/i).build();
}
// 声明队列
@Bean
public Queue queueConfirm() {
return QueueBuilder.durable(QUEUE_/confirm/i).build();
}
// 绑定队列交换机路由key
@Bean
public Binding queueConfirmBindingExchange/confirm/i(
@Qualifier("queue/confirm/i") Queue queue/confirm/i,
@Qualifier("exchange/confirm/i") DirectExchange exchangeConfirm
) {
return BindingBuilder.bind(queue/confirm/i).to(exchange/confirm/i).with(ROUTING_/confirm/i);
}
}
- My/confirm/iCallback
package cn.cnyasin.rabbit.component;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Date;
@Slf4j
@Component
public class MyConfirmCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("[*] [{}] 确认回调成功,回调ID:{}", new Date().toString(), correlationData.getId());
} else {
log.info("[*] [{}] 确认回调失败,回调ID:{},失败原因:{}", new Date().toString(), correlationData.getId(), cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
int code = returnedMessage.getReplyCode();
String text = returnedMessage.getReplyText();
byte[] message = returnedMessage.getMessage().getBody();
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
log.info("[*] [{}] 消息未送达队列回调,错误码code:{}。原因:{}。消息:{}。交换机:{}。路由key:{}",
new Date().toString(), code, text, new String(message), exchange, routingKey);
}
}
- My/confirm/iConsumer01
package cn.cnyasin.rabbit.consumer;
import cn.cnyasin.rabbit.config.My/confirm/iConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class My/confirm/iConsumer01 {
@RabbitListener(queues = My/confirm/iConfig.QUEUE_/confirm/i)
public void receiveMessage(String message) {
log.info("[*] [{}] 发布确认高级-消费者01 接收到消息:{}", new Date().toString(), message);
}
}
- My/confirm/iController
package cn.cnyasin.rabbit.controller;
import cn.cnyasin.rabbit.config.My/confirm/iConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class MyConfirmController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send/{msg}")
public String send(@PathVariable String msg) throws Exception {
log.info("[*] [{}] 准备发送消息:{}", new Date().toString(), msg);
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_/confirm/i, My/confirm/iConfig.ROUTING_/confirm/i, msg + 1, correlationData);
CorrelationData correlationData2 = new CorrelationData("2");
rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_ConFIRM + 2, My/confirm/iConfig.ROUTING_/confirm/i, msg + 2, correlationData2);
CorrelationData correlationData3 = new CorrelationData("3");
rabbitTemplate.convertAndSend(My/confirm/iConfig.EXCHANGE_/confirm/i, My/confirm/iConfig.ROUTING_ConFIRM + 3, msg + 3, correlationData3);
return "OK";
}
}