栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

【MQ】RabbitMQ 消息确认、堆积、丢失、重复消费等

【MQ】RabbitMQ 消息确认、堆积、丢失、重复消费等

AMQP

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受中间件不同产品、不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

RabbitMQ

信道:信道是生产消费者与rabbit通信的渠道,生产者发布publish或是消费者订阅subscribe一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接。也就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用。

问:为什么不建立多个TCP连接呢?原因是rabbit保证性能,系统为每个线程开辟一个TCP是非常消耗性能,每秒成百上千的建立销毁TCP会严重消耗系统性能。所以rabbitmq选择建立多个信道(建立在tcp的虚拟连接)连接到rabbit上。

交换器、队列、绑定、路由键

队列通过路由键绑定到交换器,生产者将消息发布到交换器,交换器根据绑定的路由键将消息路由到特定队列,然后由订阅这个队列的消费者进行接收。

RabbitMQ的三种Exchange

direct、fanout、topic

【RabbitMQ】RabbitMQ的三种Exchange_keeper42的博客-CSDN博客

常见问题

如果消息达到无人订阅的队列会怎么办?

消息会一直在队列中等待,RabbitMq默认队列是无限长度的。

多个消费者订阅到同一队列怎么办?

消息以循环的方式发送给消费者,每个消息只会发送给一个消费者。

消息路由到了不存在的队列怎么办?

一般情况下,RabbitMq会忽略,当这个消息不存在,也就是这消息丢了。

消息确认

消费者收到的每一条消息都必须进行确认(自动确认和消费者自行确认)。

消费者在声明队列时,可以指定autoAck参数,当autoAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。

采用消息确认机制后,只要令autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

当autoAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列。

RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。

消息堆积

1. 消息堆积的后果

新消息无法进入队列、旧消息无法丢失、消息等待消费时间过长以至于超出了业务容许的范围。

2. 消息堆积的原因

生产者突然大量发布消息、消费者来不及消费或消费失败、消费者出现性能瓶颈、消费者直接挂掉了。

3. 如何解决消息堆积

(1)排查生产者,是否突然大量发布消息,限制下

(2)排查消费者,消费性能瓶颈,增加消费者的多线程处理(缩短线程休眠时间等)、部署多个消费者

(3)排查消息队列,可以想办法把消息按顺序的转移到另外一个新的队列,让消费者消费新队列中的消息。

(4)可以通过修改RabbitMQ的两个参数来增大消费消息的并发数:

concurrentConsumers:对每个listener在初始化的时候设置的并发消费者的个数。

prefetchCount:每次一次性从broker里面取的待消费的消息的个数,prefetchCount是BlockingQueueConsumer内部维护的一个阻塞队列 linkedBlockingQueue 的大小,其作用就是如果某个消费者队列阻塞,就无法接收新的消息,该消息会发送到其它未阻塞的消费者。

消息丢失

消息分别在生产者、消息队列、消费者中丢失:

1. 消息在生产者丢失

原因:生产者发送消息成功,但MQ没收到该消息,一般由网络不稳定造成。

解决方案:发送方采用消息确认机制,当消息成功被MQ接收到后,会给生产者发送一个确认消息,表示接收成功。RabbitMQ发送方确认模式有三种,普通确认、批量确认、异步确认。Spring整合RabbitMQ后只使用了异步监听确认模式。

2. 消息在队列中丢失

原因:消息发送到MQ后,消息还没被消费却在MQ中丢失了。比如MQ服务器宕机或者未进行持久化就进行了重启。

解决方案:持久化交换机(Exchange)、队列、消息。确保MQ服务器异常重启时仍然能从磁盘恢复对应的交换机,队列和消息。然后我们把MQ做多台分布式集群,防止出现某一MQ服务器挂掉~

3. 消息在消费者丢失

原因:默认消费者消费消息时,设置的是自动回复MQ收到了消息。MQ会立刻删除自身保存的这条消息,如果消息已经在MQ中被删除,但消费者的业务处理出现异常或消费者服务宕机,那么就会导致该消息没有处理成功从而导致消息丢失。

解决方案:消费者向MQ的回复我们设置成手动回复(配置成手动ACK)。当消费者出现异常或者服务宕机时,MQ服务器不会删除该消息,而是会把消息重发给绑定该队列的消费者,如果该队列只绑定了一个消费者,则该消息会一直保持在MQ服务器,直到消费者能正常消费为止。

正常业务逻辑应该是本地业务执行成功,手动ack这条消息。如果业务执行完毕,手动ack的时候恰好服务宕机了,重启……这不是会造成重复消费吗?没错,这就牵扯mq的另一个问题了,mq消息重复消费~

重复消费 1.场景

因消息重发机制会出现消息重复消费的情况

2.解决方案

(1)幂等操作,同一个操作执行N次,结果不变。

(2)若实际业务中用不了幂等,则保存消息id到数据库(Redis)中,每次消费前查看消息是否已经被消费过。

有序消费 1.场景

在work queue模式下,只有一个队列,但存在多个消费者。多个消费者线程的竞争会导致数据乱序。
在简单队列模式下,同样的多个消费者线程也会导致数据乱序。


2.解决方案
使用多个队列,对消息的id值做hash。再对队列数取模(hash值%队列数),将结果相同的消息压入同一个队列中去,这就保证了一个队列中有且仅有一个消费者。
在MQ队列后的Java代码中(消费方),再为每一个线程加一个内存队列,根据消息的id求hash值,然后把相同的结果压入同一个内存队列……
 


 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/775062.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号