消息中间件作为互联网常用的技术手段,能够进行服务解耦、削峰填谷、异步操作,虽然这些的确可以给我们的系统带来好处,但是如果使用不当也会带来不少的坑,本文以rabbitMQ为例来分享在使用在使用消息队列中遇到的一些坑以及解决方案。
所以为分布式消息队列就是将消息分摊到各个节点上,所有节点上的消息汇总就是消息的总和。
在了解今天正文内容之前,我们先大概了解一下消息队列的模型。
从上面这张图可以看到,消息队列总共由发送者+队列+消费者组成。
所谓生产者消息丢失指的是消息在生产者投放到队列的过程中丢失,下面这张图可以大概描述消息丢失的过程。
常见的解决方案
- confirm 机制(推荐,异步方式)
我们可以采用一种模式:confirm 模式来解决同步机制的性能问题。每次生产者发送的消息都会分配一个唯一的 id,如果写入到了 RabbitMQ 队列中,则 RabbitMQ 会回传一个 ack 消息,说明这个消息接收成功。如果 RabbitMQ 没能处理这个消息,则回调 nack 接口。说明需要重试发送消息。
除此之外也可以自定义超时时间 + 消息 id 来实现超时等待后重试机制。但可能出现的问题是调用 ack 接口时失败了,所以会出现消息被发送两次的问题,这个时候就需要保证消费者消费消息的幂等性。
- 事务机制
对于 RabbitMQ 来说,生产者发送数据之前开启 RabbitMQ 的事务机制channel.txselect ,如果消息没有进队列,则生产者受到异常报错,并进行回滚 channel.txRollback,然后重试发送消息;如果收到了消息,则可以提交事务 channel.txCommit。但这是一个同步的操作,会影响性能,所以一般不推荐这种方式。
消息在队列中丢失所谓消息在队列中丢失指的是生产者已经将消息投放到了队列中,比如因为环境问题,导致已经投放到队列中的消息丢失了,队列不能再次拿到生产者的消息,从而对业务带来损失。
常见的解决方案,队列和消息的持久化,出于数据安全考虑,一般消息都会进行持久化
- 声明队列Queue的时候设置 durable=true,true表示需要持久化 false表示不需要持久化
@Bean
public Queue queue() {
//durable:是否将队列持久化 true表示需要持久化 false表示不需要持久化
return new Queue(QUEUE_NAME, true);
}
- 发送消息的时候设置消息的 deliveryMode = 2,代码如下new MessageProperties() --> DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT --> deliveryMode = 2;
- 开始/confirm/i机制,尝试重发消息
所谓消费者消息丢失值得是消费者在队列中拿到消息之后,还没来得及消费消费者就挂掉了,从而导致消费没有消费成功。
常见的解决方案
- 关闭自动ack机制,等消费者处理完下次之后手动触发ack。为了防止手动ack出问题,需要做好消息的幂等
- 尝试重试机制,但是如果超过重试次数的阈值(比如3)则记录到异常表进行人工补偿。
想象一种场景应该是先下单,然后再取消订单,如果消息乱序之后,先取消订单再去创建订单,这样子就会导致数据库中留下了一条下单成功的数据,很显然是不对的。
常见的解决方案
- 将 Queue 进行拆分,创建多个内存 Queue,消息 1 和 消息 2 进入同一个 Queue。
- 创建多个消费者,每一个消费者对应一个 Queue。
消息积压:消息队列里面有很多消息来不及消费。
- 场景 1: 消费端出了问题,比如消费者都挂了,没有消费者来消费了,导致消息在队列里面不断积压。
- 场景 2: 消费端出了问题,比如消费者消费的速度太慢了,导致消息不断积压。
常见的解决方案
- 修复代码层面消费者的问题,确保后续消费速度恢复或尽可能加快消费的速度
- 停掉现有的消费者
- 临时建立好原先 5 倍的 Queue 数量
- 临时建立好原先 5 倍数量的 消费者
- 将堆积的消息全部转入临时的 Queue,消费者来消费这些 Queue



