栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ防止消息丢失

RabbitMQ防止消息丢失

消息丢失场景

MQ消息丢失场景主要有三个:
1.消息生产者,发送消息后,rabbitmq服务器没有收到;导致消息丢失
2.rabbitmq收到消息后,没有持久化保存,导致消息丢失
3.消费者收到消息后,没来得及处理,消费者宕机,导致消息丢失

1、生产者发送消息没有发送到rabbit交换机

解决方案:消息异步确认机制(/confirm/i机制)
在集成springboot项目中;通过配置文件开启/confirm/i机制

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123123
spring.rabbitmq.virtual-host=/test
spring.rabbitmq.connection-timeout=15000

#开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true
#开启 return 确认机制
spring.rabbitmq.publisher-returns=true
#设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
spring.rabbitmq.template.mandatory=true

开启/confirm/i机制后,在生产者每次发送消息,都会调用回调代码;开发人员,需要写回调函数的逻辑,处理发送失败的消息

@Component
@Slf4j
public class RabbitMQ/confirm/iAndReturn implements RabbitTemplate./confirm/iCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.set/confirm/iCallback(this);
    }

    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
    	// 失败,一般解决方案,是将发送失败消息,存入定时任务队列;尝试重新发送消息;再多次失败,
    	// 就不再发送,转为人工处理
        if (!ack) {
            log.error("rabbitmq confirm fail,cause:{}", cause);
            // ...... 失败处理逻辑
        }
    }
}
2、交换机没有发送到队列

解决方案:Return模式,确保消息从交换机发送到队列。
1.开启return模式

#开启 return 机制
spring.rabbitmq.publisher-returns=true

2.开发回调函数代码

@Component
public class Sender implements RabbitTemplate.ReturnCallback {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        rabbitTemplate.setReturnCallback(this);
    }
 
    //通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)
	@Override
	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("消息主体message: " + message);
		System.out.println("消息replyCode: " + replyCode);
		System.out.println("描述: " + replyText);
		System.out.println("消息使用的交换器exchange: " + exchange);
		System.out.println("消息使用的路由键routing: " + routingKey);
	}
}
3、交换机、队列、消息没有设置持久化

交换机、队列、消息没有持久化,当rabbitmq的服务重启之后,这些信息就会丢失。

交换机持久化
在声明交换机的时候,设置持久化属性

	
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchangeName", true, false);
    }

队列持久化
在声明队列的时候,设置持久化属性

    public Queue queue() {
    	
        return new Queue("queueName", true, false, false, args);
    }

消息持久化

消息的持久化是默认持久的。无需配置

4、消费者接收到消息没有执行业务逻辑,导致消息丢失

解决方案:手动确认消息机制
配置文件配置

spring.rabbitmq.listener.simple.acknowledge-mode=manual

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: root
    password: 123123
    virtual-host: /test
    publisher-/confirm/is: true   # 开启发送确认
    publisher-returns: true  # 开启发送失败回退
    
    #开启ack
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual #采取手动应答
        #concurrency: 1 # 指定最小的消费者数量
        #max-concurrency: 1 #指定最大的消费者数量
        retry:
          enabled: true # 是否支持重试

消费者代码

@Component
public class Consumer {
	@RabbitHandler
	public void consumeMsg(String msg, Channel channel, Message message) throws IOException {
		//拿到消息延迟消费
		try {
			// .... 消费消息业务逻辑

			
			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			
		} catch (InterruptedException e) {
			e.printStackTrace();
			
			channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
		}
	}
}

当业务出现意料之外的一场;消息就会重新回到队列中;会分发到其他正常consumer中进行消费

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/354500.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号