1.在application.properties添加spring.rabbitmq.publisher-/confirm/i-type=correlated
# 应用名称 spring.application.name=demo spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-/confirm/i-type=correlated
2.添加配置类
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class /confirm/iConfig {
public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
//声明业务 Exchange
@Bean("/confirm/iExchange")
public DirectExchange /confirm/iExchange(){
return new DirectExchange(/confirm/i_EXCHANGE_NAME);
}
// 声明确认队列
@Bean("/confirm/iQueue")
public Queue /confirm/iQueue(){
return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
}
// 声明确认队列绑定关系
@Bean
public Binding queueBinding(@Qualifier("/confirm/iQueue") Queue queue,
@Qualifier("/confirm/iExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
3.创建MyCallBack配置文件
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnCallback{
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String id=correlationData!=null?correlationData.getId():"";
if(ack){
log.info("交换机已经收到 id 为:{}的消息",id);
}else{
log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,cause);
}
}
//当消息无法路由的时候的回调方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error(" 消 息 {}, 被交换机 {} 退回,退回原因 :{}, 路 由 key:{}",new
String(message.getBody()),exchange,replyText,routingKey);
}
}
4.消息生产者
import com.rabbitmq.demo.config.MyCallBack;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
@RestController
@RequestMapping("//confirm/i")
@Slf4j
public class /confirm/iProducer {
public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init() {
rabbitTemplate.set/confirm/iCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
//指定消息 id 为 1
CorrelationData correlationData1 = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(/confirm/i_EXCHANGE_NAME, routingKey, message + routingKey, correlationData1);
CorrelationData correlationData2 = new CorrelationData("2");
routingKey = "key2";
rabbitTemplate.convertAndSend(/confirm/i_EXCHANGE_NAME, routingKey, message + routingKey, correlationData2);
log.info("发送消息内容:{}", message);
}
}
4.消息消费者
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class /confirm/iConsumer {
public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
@RabbitListener(queues =/confirm/i_QUEUE_NAME)
public void receiveMsg(Message message){
String msg=new String(message.getBody());
log.info("接受到队列 /confirm/i.queue 消息:{}",msg);
}
}



