生产者:
@Configuration
public class RabbitFanoutConfig {
* 声明短信队列
@Bean(name = "smsQueu")
public Queue queue(){
return new Queue("sms-queue");
}
* 声明交换机 fanoutExchange
@Bean(name = "fanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange("sending-exchange");
}
* 将sms-queue绑定到了交换机上
@Bean
Binding bindSmsQueueToExchange(@Qualifier("fanoutExchange") FanoutExchange fanoutExchange,@Qualifier("smsQueu")Queue queue){
return BindingBuilder.bind(queue).to(fanoutExchange);
}
* routing模式 路由模式(可以给单个队列发送,也可以同时给多个队列发送)
@Bean
Binding bindEmailAllToDirectExchange(@Qualifier("directExchange")DirectExchange directExchange,@Qualifier("emailQueue")Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("routingKey:规则");
}
* 通配符模式可以简化routing模式,当我们同时要给队列发送时,routing需要绑定多个路由,比较繁琐
* 使用topic 可以 简化路由:
* 使用* 代表一个字符
* 使用# 代表0个或多个字符
@Bean
Binding bindSmsToTopicExchange(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("smsTopicQueue")Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("#.sms");
}
@Bean
Binding bindEmailToTopicExchange(@Qualifier("topicExchange")TopicExchange topicExchange,@Qualifier("emailTopicQueue")Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("email.#");
}
延迟队列
@Bean(name = "deadqueue")
public Queue deadQueue(){
//配置死信队列的配置
Map map = new HashMap<>();
//1.设置当前消息过期后,发送到指定的正常交换机中
map.put("x-dead-letter-exchange","normal-topic");
//2.设置转发到正常队列时的路由键
map.put("x-dead-letter-routing-key","normal");
//3.设置队列的存活时长,一般发送的时候配置
//map.put("x-message-ttl",10000);
return new Queue("dead-queue",true,false,false,map);
}
}
生产者确认机制,需要再配置文件开启确认机制
#开启确认机制 消息发送到交换机的确认
spring.rabbitmq.publisher-/confirm/is: true
#开启 交换机转发到队列的确认
spring.rabbitmq.publisher-returns: true
#将自动确认更改为手动确认
listener.simple.acknowledge-mode: manual
@Configuration
public class ConfirmCallBackConfig implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
//实例化该类对象时,首先执行的方法,修改模板类,设置确认的回调方法
@PostConstruct
public void rabbitTemplate(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("/confirm/i: " + correlationData + ", ack=" + b
+ (s == null ? "" : (", cause: " + s)));
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("信息主体为:"+message);
System.out.println("返回的code码为:"+replyCode);
System.out.println("返回的文本为:"+replyText);
System.out.println("交换机的名称:"+exchange);
System.out.println("路由键:"+routingKey);
}
}
消费者:
@Component
public class RabbitListen {
* 1.RabbitListener 监听队列的名称。
* 2.没有返回值 默认为void
* 3.当消费者消费 消息时发生报错,则该条消息被rabbit认为没有正确消费,队列种的消息不会被清除,一直尝试消费,直到成功为止
* 消息的接受的确认机制 ,有自动的ack回复,变为手动的ack
@RabbitListener(queues = "队列名称")
public void listentopSms(String msg, Channel channel, Message message) throws IOException {
try {
System.out.println("消息主体为:========="+msg);
System.out.println("消息的唯一标识:========="+message.getMessageProperties().getMessageId());
System.out.println("消息的消费顺序:========="+message.getMessageProperties().getDeliveryTag());
//使用channel 确认 消息是否被正确消费 1.消息的唯一标识。2.是否时批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
System.out.println("消息没有被正确消费");
}
}
controller
@RequestMapping("/send")
@RestController
public class SendController {
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/sendToSMS/{msg}")
public String sendToSMS(@PathVariable("msg")String msg){
rabbitTemplate.convertAndSend("publish-Top-exchange", "sms", msg, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return message;
}
});
return "success";
}
@RequestMapping("/sendLazy/{msg}")
public String sendLazy(@PathVariable("msg")String msg){
//设置额外的参数
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());
* 4.MessagePostProcessor 消息的前置增强
* 5.correlationData 携带的额外的参数
rabbitTemplate.convertAndSend("交换机名称","路由信息",消息主体, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//从message中获取到message的属性
MessageProperties messageProperties = message.getMessageProperties();
//设置message信息的唯一标识
messageProperties.setMessageId(UUID.randomUUID().toString());
//设置message信息的存活时长
messageProperties.setExpiration("10000");
return message;
}
},correlationData);
return "success";
}
}



