栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ相关--消息可靠性投递

RabbitMQ相关--消息可靠性投递

消息可靠性投递

在使用RabbitMQ时,消息生产者发送消息时会出现消息丢失或者投递失败的现象;

RabbitMQ在消息投递可靠性方面提供了两种模式:

1.Confirm 确认模式 producter->exchange

2.Return 退回模式 exchange->queue

一、RabbitMQ运作原理

二、/confirm/i确认模式介绍及实现

producer -> exchange

消息确认,生产者消息投递后,如果exchange收到消息,则会给生产者一个答应。

生产者接收应答,以确定消息成功发送到exchange。

1.修改配置文件,开启确认模式
spring:
  rabbitmq:
    publisher-/confirm/i-type: correlated 

/confirm/i-type有none、correlated、simple这三种类型
none:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发 /confirm/iCallback回调。correlated:表示消息成功到达Broker后触发/confirm/iCalllBack回调simple:simple模式下如果消息成功到达Broker后一样会触发

2.实现RabbitTemplate./confirm/iCallback接口,实现/confirm/i方法
@Configuration
public class RabbitConfig {
    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(converter());

        // 消息是否成功发送到Exchange
        
        rabbitTemplate.set/confirm/iCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("消息成功发送到Exchange");
            } else {
                //通过correlationData携带的id可定位哪条消息发送失败,做补发操作
                log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause);
            }
        });
    }
}
3.生产者发送消息
@Service
public class Producter {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ObjectMapper objectMapper;
    
    public void sendMessage(Dto Dto) throws JsonProcessingException {
        mailDto.setMsgId(RandomUtil.randomUUID());
        String msgJson = objectMapper.writevalueAsString(Dto);
        Message message = MessageBuilder
                .withBody(msgJson.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
                .build();
        CorrelationData correlationData = new CorrelationData(mailDto.getMsgId());
        rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME,RabbitConfig.MAIL_ROUTING_KEY_NAME,message,correlationData);
    }
}
二、Return退回模式介绍和实现

exchange->queue
如果消息未能路由到目标队列则将触发回调 ReturnCallback

1.修改配置文件,开启return
rabbitmq:
  publisher-/confirm/is: true
  publisher-returns: true
2.实现returnedMessage方法
// 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
rabbitTemplate.setMandatory(true);

// 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
});
三、消费端ACK手动确认机制

消息确认三种模式:manual、none、auto

手动确认manual:消费消息后需要根据消费情况返回一个回执,成功手动调用channel.basicAck()手动签收,失败则调用channel.basicNack()方法拒收,让MQ重新发送该消息自动确认 none:默认消费者正确处理所有请求。(不设置时默认方式)根据情况确认 auto:不太常用

1.修改配置文件,开启ACK手动确认
spring:
  rabbitmq:
    publisher-/confirm/i-type: correlated
    listener:
      simple:
        acknowledge-mode: manual
没有确认,消息为Unack状态

2.消费者接收消息并手动确认消息
@Override
@RabbitListener(queues = {RabbitMQConfig.DIRECT_QUEUE})
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
    try{
        System.out.println("业务处理流程,成功则ACK");
        channel.basicAck(tag,true);
    }catch (Exception e){
        
        System.out.println("业务处理流程,失败则NACK");
        channel.basicNack(tag,false,true);
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/730706.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号