在有多个系统组成的应用中,常常出现A系统会影响B、C、D数据的情况。通常做法是在A中调用其他系统的接口,这样系统之间的依赖太大,如果后续添加新的系统,就需要在A中添加相应的逻辑。这样做耦合程度太大,不利于维护。
加入MQ后,A系统中不用添加其他系统的调用,只需要发送消息,其他系统监听消息,在自己系统中处理。新增或者删除也不需要改动A系统的代码,只需要在自己中取消该类型的消息监听就行。
很多时候涉及多服务之间调用的情况,客户端发起请求,A中回去调用B、C、D的接口,最后再将执行结果返回到客户端,这样一个流程中A接口的执行时间,收到其他服务的影响,是他们执行时间的总和,如果A不关心B、C、D他们的执行情况,就可以使用MQ。A发送消息后直接返回,从而提升接口的响应时间。
削峰
当系统面临大量请求时,会对数据库造成很大压力,引入MQ后,你可以根据数据库的实际处理能力,每次从MQ中拿一定数量的数据处理,处理完从中取。
直接去官网下载安装包。
https://www.rabbitmq.com/
// 拉去镜像 docker pull rabbitmq // 启动容器 docker run -it --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -d rabbitmq // 进入容器开启管理界面 docker exec -it rabbitmq sh //开启管理界面 rabbitmq-plugins enable rabbitmq_management
通过访问http://localhost:15672/ 即可看到管理界面
在Springboot集成RabbitMQ 引入依赖配置队列和交换机org.springframework.boot spring-boot-starter-amqp
@Configuration
public class RabbitmqConfig {
@Bean
public Queue msgQueue(){
return new Queue("MSG_MQ", true, false, false);
}
@Bean
public DirectExchange msgExchange(){
return new DirectExchange("MSG_ECHANGE", true, false);
}
@Bean
public Binding mailBinding(){
return BindingBuilder
.bind(mailQueue())
.to(msgExchange())
.with("MSG_ROUTING");
}
}
生产者发送消息
@RequestMapping("/v1/demo")
@RestController
public class DemoController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sed_queue")
public void sendMsg(){
rabbitTemplate.convertAndSend("MSG_ECHANGE", "MSG_ROUTING", "你好惨:"+ System.currentTimeMillis());
}
}
消费者接受消息
使用 @RabbitListener 去监听消息队列,队列中有消息了就去消费
@Component
public class RabbitListner {
@RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
public void handleMsg(String msg){
System.out.println("msg-"+ msg);
}
}
RabbitMQ的组成queues 和queuesToDeclare 不同点 :使用queuesToDeclare时,服务启动时回去MQ中检测监听的队列是否存在,没有这个队列会就会去创建
- Broker:消息队列服务进程。此进程包括两个部分:Exchange和Queue。
- Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列。
- Queue:消息队列,存储消息的队列。
- Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
- Consumer:消息消费者。消费队列中存储的消息。
- DirectExchange:直连交换机,需要绑定一个队列,同时需要指定routeKey值,类似于点对点发送。上面demo中使用的就是DirectExchange
- FanoutExchange: 将有需要的队列与此交换机绑定后,一个发送到交换机上的消息会被转发到与这个交换机相连的所有队列上。这种模式类似于发布订阅。
@Bean
public Queue faQueue1(){
return new Queue("fa.queue1", true, false, false);
}
@Bean
public Queue faQueue2(){
return new Queue("fa.queue2", true, false, false);
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout.exchange", true, false);
}
@Bean
public Binding bindingFanoutExchange(){
return BindingBuilder
.bind(faQueue1())
.to(fanoutExchange());
}
@Bean
public Binding bindingFanoutExchange1(){
return BindingBuilder
.bind(faQueue2())
.to(fanoutExchange());
}
@RabbitListener(queuesToDeclare = @Queue("fa.queue1"))
public void faQueue1(String msg){
System.out.println("faQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("fa.queue2"))
public void faQueue2(String msg){
System.out.println("faQueue2-"+msg);
}
@GetMapping("/sed_fanout")
public void sendFanoutMsg(){
rabbitTemplate.convertAndSend("fanout.exchange", null, "fanoutExchange:"+ System.currentTimeMillis());
}
- TopicExchange: 主题交换机又可以叫通配符交换机。这种交换机通过通配符去匹配,然后路由到对应的队列。通配符 # 和*分别代表匹配多个和一个。
@Bean
public Queue topicQueue1(){
return new Queue("topic.queue1", true, false, false);
}
@Bean
public Queue topicQueue2(){
return new Queue("topic.queue2", true, false, false);
}
@Bean
public TopicExchange topicExchange1(){
return new TopicExchange("topic.exchange1", true, false);
}
@Bean
public Binding topicBinding1(){
return BindingBuilder
.bind(topicQueue1())
.to(topicExchange1())
.with("top.*");
}
@Bean
public Binding topicBinding2(){
return BindingBuilder
.bind(topicQueue2())
.to(topicExchange1())
.with("top.#");
}
@RabbitListener(queuesToDeclare = @Queue("topic.queue1"))
public void topicQueue1(String msg){
System.out.println("topicQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("topic.queue2"))
public void topicQueue2(String msg){
System.out.println("topicQueue2-"+msg);
}
@GetMapping("/sed_topic")
public void sendFanoutMsg(String key){
rabbitTemplate.convertAndSend("topic.exchange1", key, "TopicExchange:"+ System.currentTimeMillis());
}
- HeadersExchange: 这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。这种交换机用的不多。
@Bean
public Queue headQueue(){
return new Queue("head.queue1", true, false, false);
}
@Bean
public Queue headQueue1(){
return new Queue("head.queue2", true, false, false);
}
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange("head.exchange", true, false);
}
@Bean
public Binding headBinding(){
Map headers = new HashMap<>();
headers.put("abk", "asd");
return BindingBuilder
.bind(headQueue())
.to(headersExchange())
.whereAll(headers)
.match();
}
@Bean
public Binding headBinding1(){
Map headers = new HashMap<>();
headers.put("abk", "ack");
return BindingBuilder
.bind(headQueue1())
.to(headersExchange())
.whereAll(headers)
.match();
}
@RabbitListener(queuesToDeclare = @Queue("head.queue1"))
public void headQueue1(String msg){
System.out.println("headQueue1-"+msg);
}
@RabbitListener(queuesToDeclare = @Queue("head.queue2"))
public void headQueue2(String msg){
System.out.println("headQueue2-"+msg);
}
@GetMapping("/sed_head_msg")
public void sendHeaderMsg1(@RequestParam String msg,
@RequestBody Map map){
MessageProperties messageProperties = new MessageProperties();
//消息持久化
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
//添加消息
messageProperties.getHeaders().putAll(map);
Message message = new Message(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("head.exchange", null, message);
}
用postman调用请求
后台打印出
说明匹配到了head.queue2
同理,设置head的值
图中显示的是一条消息传递的整个过程,我们大致可以分析出那些环节会导致消息不可靠或者说消息丢失。
- 生产者发送消息到MQ过程中,MQ挂了,这时会出现消息丢失。
- 生产者发送消息到MQ但是没有持久化队列,消费者还没消费,MQ挂了,消息会丢失。
- 消费者消费了消息,但是出现报错或者程序挂了,这时消息也丢失了。
针对于以上的是三种情况,Rabbit为我们提供了对应的解决方案:持久化、/confirm/i机制、ACK事务机制。
消息持久化配置Exchange持久化和Queue持久化。
在创建Queue 和Exchange时设置 durable 为true
你也可以使用默认值,默认为true
交换机同样如此
在生产者发送消息到MQ这段过程中,MQ挂了,导致消息丢失。Rabbit提供/confirm/i和returnMessage方法去处理消息丢失。
springboot 添加配置
## 新版中使用 publisher-/confirm/i-type 有三个参数 # none(禁用) # correlated(触发/confirm/i回调) # simple(具有correlated的功能 同时可以使rabbitTemplate调用waitFor/confirm/is或waitFor/confirm/isOrDie) # 旧版中 publisher-confirms 默认 false spring.rabbitmq.publisher-/confirm/i-type=simple # 消息没有匹配到队列 触发returnMessage 回调 spring.rabbitmq.publisher-returns= true # publisher-returns 和 mandatory 同时使用时优先使用 mandatory spring.rabbitmq.template.mandatory= true
实现 RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback
@Component
public class RabbitCallback implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String error) {
if(ack){
System.out.println("消息发送成功");
} else {
System.out.println("消息发送失败");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("replyCode:").append(replyCode).append(",")
.append("replyText:").append(replyText).append(",")
.append("exchange:").append(exchange).append(",")
.append("routingKey:").append(routingKey).append(",");
}
}
没有匹配到路由触发returnMessage
找到交换机触发/confirm/i
没有找到交换机和队列
消息确认机制解决了消息发送MQ这个过程中的问题,ACK则是解决消费者处理过程中消息丢失的问题。
消费者接受消息,在处理过程中出现失败手动拒签,重新放回队列等待再次消费,消费成功后手动签收。
配置手动模式
### 开启手动模式 spring.rabbitmq.listener.simple.acknowledge-mode=manual ## 最小消费者数量 spring.rabbitmq.listener.simple.concurrency=1 ## 最大消费者数量 spring.rabbitmq.listener.simple.max-concurrency=1
改造消费者
@RabbitListener(queuesToDeclare = @Queue("MSG_MQ"))
public void handleMsg(String msg, Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
if("success".equals(msg)){
channel.basicAck(deliveryTag, false);
} else if("reply".equals(msg)) {
// basicReject 和 basicNack的区别 basicReject不支持批量 basicNack不支持
// channel.basicReject(deliveryTag, true);
channel.basicNack(deliveryTag, false, true);
} else {
channel.basicNack(deliveryTag, false, false);
}
}
basicAck : 成功确认消息
- deliveryTag: 消息的index
- mutiple: 是否批量确认, 为true时,一次ack所有小于deliveryTag的消息
basicReject: 失败拒绝
- deliveryTag: 消息的index
- requeue: 是否重新放入队列
basicNack: 失败拒绝 - deliveryTag: 消息的index
- mutiple:批量拒绝,一次性拒绝所有小于deliveryTag的消息
- requeue: 是否重新放入队列
- nack死循环
reply 的消息重新放入队列后,程序还是处理不了,会出现死循环,不断地消费,放入队列,知道问题解决。
我的想法是用数据库去保存消息信息。然后通过定时任务再去处理或者通过界面反馈通知 - double ack
开启自动ACK的时候又在代码中手动处理,导致一个消息触发两次ack,有一次ack会失败。 - 性能消耗
手动ack模式会比自动模式慢是10倍左右,很多时候使用默认的就行了。 - 手动ack,不及时回复会导致队列异常



