栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq 消息丢失和消息手动确认处理

RabbitMq 消息丢失和消息手动确认处理

1.安装依赖

	org.springframework.boot
    spring-boot-starter-amqp

2.配置rabbitmq
  1. 配置文件
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: zytool
    password: 123456
    virtual-host: /
    publisher-returns: true  # 开启发送消息失败返回 对应RabbitTemplate.ReturnCallback接口
    publisher-/confirm/i-type: correlated # publisher-/confirm/is和publisher-returns是对于消息生产端的配置
    listener:
      simple:
        #并发数与最大并发数
        concurrency: 2
        max-concurrency: 2
        #预取数
        prefetch: 6
        acknowledge-mode: manual  #开启手动确认模式(针对于消息消费端)
  1. 配置交换机、队列、以及绑定
@Configuration
@Slf4j
public class RabbitMqConfig implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.set/confirm/iCallback(this);            // 指定 /confirm/iCallback
        rabbitTemplate.setReturnCallback(this);             // 指定 ReturnCallback
    }

    
    @Bean
    public DirectExchange myDirectExchange() {
        // 参数意义:
        // name: 名称
        // durable: true
        // autoDelete: 自动删除
        return new DirectExchange(MqConstant.NEWSEXCHANGE, true, false);
    }

    
    @Bean
    public Queue myDirectQueue() {
        return new Queue(MqConstant.NEWSQUEUE, true);
    }

    
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(myDirectQueue())
                .to(myDirectExchange())
                .with(MqConstant.NEWSROUTINGKEY);
    }
}
3. 实现消息发送确认

配置类需要实现RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback

    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息成功到达exchange!");
        } else {
            log.info("消息未能成功到达exchange!{}", correlationData.getReturnedMessage());
            log.info("消息到达Exchange失败原因:{}", cause);
            // 根据业务逻辑实现消息补偿机制。如记录到数据库或者日志里面,通过定时任务再去跑

        }
    }

    
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息报文:{}", new String(message.getBody()));
        log.info("消息编号:{}", replyCode);
        log.info("描述:{}", replyText);
        log.info("交换机名称:{}", exchange);
        log.info("路由名称:{}", routingKey);

        // 根据业务逻辑实现消息补偿机制

    }
4. 实现消息消费确认

这里我通过 NewsLister 类来监听队列

@Component
@Slf4j
public class NewsListener {

    @Autowired
    private NewsInfoService newsInfoService;

    @RabbitHandler
    @RabbitListener(queues = "newsDirectQueue")
    public void getNews(Object rabbitObject, Message message, Channel channel) throws InterruptedException, IOException {
        String msg = new String(message.getBody());
        Boolean ret = false;
        try {
            Thread.sleep(1000);
            NewsInfoVO newsInfoVO = JSONObject.parseObject(msg, NewsInfoVO.class);
            long snowId = SnowflakeIdUtil.getSnowId();
            newsInfoVO.setId(snowId + "");
            newsInfoVO.setStatus(CommonConstant.NEWS_SUBMIT);
            newsInfoVO.setCreateBy(666l);
            newsInfoVO.setCreateTime(new Date());
            newsInfoService.save(newsInfoVO);
            ret = true;
        } catch (Exception e) {
            // redelivered = true, 表明该消息是重复处理消息
            Boolean redelivered = message.getMessageProperties().getRedelivered();

            
            try {
                if (redelivered) {
                    
                    // 消息已重复处理失败, 扔掉消息
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
                    log.error("消息[{}]重新处理失败,扔掉消息", msg); //可以记录到数据库
                }

                // redelivered != true,表明该消息是第一次消费
                if (!redelivered) {
                    // 消息重新放回队列
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                    log.error("消息[{}]处理失败,重新放回队列", msg);
                }
            } catch (Exception e1) {
                e1.printStackTrace();
            }

        } finally {
            if (ret) {
                //消息确认Ack
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
                log.info("消息消费正常!");
            } else {
                log.info("消息消费异常!");
            }
        }
    }
}
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/654030.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号