原文网址:RabbitMQ--消息堆积/消息积压--解决/处理/方案--消息异常_IT利刃出鞘的博客-CSDN博客
消息堆积原因- 消息堆积即消息没及时被消费,是生产者生产消息速度快于消费者消费的速度导致的。
- 消费者消费慢可能是因为:本身逻辑耗费时间较长、阻塞了。
- 给消息设置年龄,超时就丢弃
- 考虑使用队列最大长度限制
- 减少发布频率
- 增加消费者的处理能力 //优化代码;使用JDK的队列缓存数据,多线程去处理(一般考虑顺序问题,采用单例线程)
- 建立新的queue,消费者同时订阅新旧queue,采用订阅模式
- 默认情况下,rabbitmq消费者为单线程串行消费(org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer类的concurrentConsumers与txSize(对应prefetchCount)都是1),设置并发消费两个关键属性concurrentConsumers和prefetchCount。concurrentConsumers:设置的是对每个listener在初始化的时候设置的并发消费者的个数;prefetchCount:每次从broker里面取的待消费的消息的个数。
配置方法:修改application.properties:spring.rabbitmq.listener.concurrency=m spring.rabbitmq.listener.prefetch=n
Spring Amqp的解释:
prefetchCount(prefetch) The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode is NONE. This will be increased, if necessary, to match the txSize concurrentConsumers(concurrency) The number of concurrent consumers to initially start for each listener.其他网址:RabbitMQ消费者的几个参数 - 简书
- 生产者端缓存数据,在mq被消费完后再发送到mq,打破发送循环条件。设置合适的qos值(channel.BasicQos()方法:每次从队列拉取的消息数量),当qos值被用光,而新的ack没有被mq接收时,就可以跳出发送循环,去接收新的消息。
- 消费者主动block接收进程,消费者感受到接收消息过快时主动block,利用block和unblock方法调节接收速率,当接收线程被block时,跳出发送循环。
其他网址:完了!生产事故!几百万消息在消息队列里积压了几个小时!_一诺-CSDN博客
情况1:堆积的消息还需要使用方案1:简单修复
修复consumer的问题,让他恢复消费速度,然后等待几个小时消费完毕
方案2:复杂修复
临时紧急扩容了,具体操作步骤和思路如下:
1)先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉
2)新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量
3)然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
4)接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
5)这种做法相当于是临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
6)等快速消费完积压数据之后,得恢复原先部署架构,重新用原先的consumer机器来消费消息
删除消息即可。
见:RabbitMQ--安装/配置/使用/用法_IT利刃出鞘的博客-CSDN博客



