使用Docker搭建好一个三个节点组成的Rabbitmq集群,接下来用SpringBoot项目连接集群。
依赖:
org.springframework.boot spring-boot-starter-amqp
配置:
server:
port: 8080
spring:
rabbitmq:
# 单机连接配置host port即可
# host: 192.168.50.134
# port: 5672
# 集群连接信息
addresses: 192.168.50.134:5673,192.168.50.134:5672,192.168.50.134:5674
virtual-host: /dev
username: tech
password: tech
# 开启发送确认机制,感知消息是否到达交换机
publisher-/confirm/i-type: correlated
# 开启消息从交换机到队列的确认机制,感知消息是否到达队列
publisher-returns: true
# true表示交换机转发消息到队列失败,将消息返给发送者
template:
mandatory: true
#开启消费者手动确认
listener:
simple:
acknowledge-mode: manual
address是集群地址配置,经过多次实验,服务会先使用配置中集群节点地址的第一个地址与RabbitMQ服务器建立连接,这里应该是连接端口号为5673的RabbitMQ。
队列配置:
package com.tech.rabbitmq.spring;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME="exchange_order";
public static final String QUEUE="order_queue";
@Bean
public Exchange orderExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
@Bean
public Queue orderQueue(){
return QueueBuilder.durable(QUEUE).build();
}
@Bean
public Binding orderBinding(Queue queue,Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}
消息发送者(send接口)
package com.tech.rabbitmq.spring;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
public class MsgSendController {
@Autowired
private RabbitTemplate rabbitTemplate;
private int c=0;
@GetMapping("send")
String send() {
// for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单 "+(++c));
// }
return "ok";
}
@GetMapping("/confirm/i")
String confirm() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("correlationData={}", correlationData);
log.info("ack={}", ack);
log.info("cause={}", cause);
if (ack) {
log.info("消息发送成功");
} else {
log.info("消息发送失败");
}
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单");
//模拟发送失败,使用一个不存在的交换机
//rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+"A","order.new","新订单");
log.info("ok");
return "ok";
}
@GetMapping("return")
String ret(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("code={}",returned.getReplyCode());
log.info("returned={}",returned);
}
});
// rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单 ReturnsCallback");
// 模拟交换机转发消息到队列失败
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "A"+"order.new", "新订单 ReturnsCallback");
log.info("ok");
return "ok";
}
}
消息消费者:
package com.tech.rabbitmq.spring;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {
@RabbitHandler
public void messageHandler(String body, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+deliveryTag);
System.out.println("message="+message.toString());
System.out.println("body="+body);
//进行手动确认
//消息投递序号 是否批量
channel.basicAck(deliveryTag,false);
// if(body.contains("2")){
// channel.basicAck(deliveryTag,false);
// }
//拒收消息
//消息投递序号 是否批量 是否将消息回退到队列
// channel.basicNack(deliveryTag,false,true);
//拒收消息 (不支持批量拒收)
//消息投递序号 是否将消息回退到队列
// channel.basicReject(deliveryTag,true);
System.out.println("*****************************");
}
}
启动服务后,调用send接口,会发现队列是由服务连接的这个RabbitMQ来创建,这里是5673的端口host地址是rabbit_host2,也就是第二个节点。
假如第二个节点宕机
此时如果还是使用order_queue队列发送消息或者消费消息将会报错。因为队列是由rabbitmq2这个节点创建,消息只能存储在rabbitmq2上,而rabbitmq2处于宕机状态,所以无法使用这个队列发送或者消费消息。
启动rabbitmq2,重启服务可以正常使用这个队列了。
如果出问题的是其他节点,将不影响消息的收发。因为消息只是储存在创建队列的节点上,服务在启动时,先连接的哪个节点就是在哪个节点上创建队列(如果不存在),将来路由到这个队列的消息也是存储在这个节点上,其他节点只是同步元数据,并不会同步消息。



