这里以direct类型为例
application.yml
server:
port: 8000
#session-timeout: 1800
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
publisher-returns: true # 发送者开启 return 确认机制
publisher-/confirm/i-type: correlated # 发送者开启 confirm 确认机制
pom.xml:
4.0.0 org.springframework.boot spring-boot-starter-parent2.3.7.RELEASE com.rabbitmq /confirm/i0.0.1-SNAPSHOT /confirm/i Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starterorg.springframework.boot spring-boot-starter-testtest org.springframework.boot spring-boot-starter-amqpjunit junitorg.projectlombok lomboktrue org.springframework.boot spring-boot-starter-web2.3.7.RELEASE compile org.springframework.boot spring-boot-maven-plugin
常量类:
public interface RabbitmqConstant {
String QUEUE_NAME = "QUEUE_DEMO_DIRECT";
String EXCHANGE_NAME = "direct_exchange";
String ROUTING_KEY = "ROUTING_KEY_01";
}
config类: 绑定队列,交换机和路由key
@Configuration
public class RabbitmqDirectConfig {
@Bean("bootDirectExchange")
public Exchange bootDirectExchange() {
return
ExchangeBuilder.directExchange(RabbitmqConstant.EXCHANGE_NAME).durable(true).build();
}
@Bean("bootDirectQueue")
public Queue bootDirectQueue() {
return QueueBuilder.durable(RabbitmqConstant.QUEUE_NAME).build();
}
@Bean
public Binding bindDirectQueueExchange(@Qualifier("bootDirectQueue") Queue queue,
@Qualifier("bootDirectExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(RabbitmqConstant.ROUTING_KEY).noargs();
}
}
监听类:当该列队有消息时自动消费
@Slf4j
@Component
public class RabbitmqListener {
@RabbitListener(queues = RabbitmqConstant.QUEUE_NAME)
public void ListenerQueue01(Message message, Channel channel) {
System.out.println("mess====" + message.toString());
log.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}
}
消息回调类:注意这一层是交换机有没有收到消息的回调
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
} else {
log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
回退类: 注意这里是当不可到达目的地的时候才回调,例如路由key错了,无法把消息传递给队列
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("到了回退方法");
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
controller类:
@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class ConfirmController {
@Resource
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService /confirm/iCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
@GetMapping("/send/confirm/i/{msg}")
public void sendConfirmMessage(@PathVariable("msg")String msg){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.setConfirmCallback(/confirm/iCallbackService);
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String mess="boot mq hello Direct"+format.format(new Date());
rabbitTemplate.convertAndSend(RabbitmqConstant.EXCHANGE_NAME, RabbitmqConstant.ROUTING_KEY,
mess);
log.info("发送信息为:" + msg);
}
@GetMapping("/sendReturn/{msg}")
public void sendReturnMessage(@PathVariable("msg")String msg){
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(returnCallbackService);
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String mess="boot mq hello Direct"+format.format(new Date());
rabbitTemplate.convertAndSend(RabbitmqConstant.EXCHANGE_NAME, RabbitmqConstant.ROUTING_KEY+"222",
mess);
log.info("发送信息为:" + msg);
}
}
还可以把消息确认类和回退类写一起:如下
@Component
@Slf4j
public class MyCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
private void init(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!");
} else {
log.info("发送者已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("到了回退方法");
log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
controller类:
@Slf4j
@RestController
@RequestMapping("//confirm/i")
public class ConfirmController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/send/confirm/i/{msg}")
public void sendConfirmMessage(@PathVariable("msg")String msg){
CorrelationData correlationData = new CorrelationData("1");
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String mess="boot mq hello Direct"+format.format(new Date());
rabbitTemplate.convertAndSend(RabbitmqConstant.EXCHANGE_NAME,
RabbitmqConstant.ROUTING_KEY,
mess);
log.info("发送信息为:" + msg);
}
@GetMapping("/sendReturn/{msg}")
public void sendReturnMessage(@PathVariable("msg")String msg){
CorrelationData correlationData = new CorrelationData("1");
SimpleDateFormat format=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String mess="boot mq hello Direct"+format.format(new Date());
rabbitTemplate.convertAndSend(RabbitmqConstant.EXCHANGE_NAME,
RabbitmqConstant.ROUTING_KEY+"222",
mess);
log.info("发送信息为:" + msg);
}
}
浏览器执行:http://127.0.0.1:8000//confirm/i/sendReturn/heheheeh222
http://127.0.0.1:8080//confirm/i/send/confirm/i/heheheeh
注意:在springboot的测试类中不会执行回调和回退方法,要以这种接口请求方式
如果 在测试类就用内部类吧:
rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> {
System.out.println("correlationData--->" + correlation);
System.out.println(ack);
if (ack) {
System.out.println("正常投递回复...");
//后续执行其他业务...
} else {
System.out.println("投递异常....");
//后续记录等操作...
}
});



