栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot整合RabbitMQ详细应用教程(2)-发送消息确认回调及手动确认消息

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot整合RabbitMQ详细应用教程(2)-发送消息确认回调及手动确认消息

一、导入maven依赖,我使用的版本和parent的版本一致2.3.12.RELEASE

        
        
            org.springframework.boot
            spring-boot-starter-amqp
        

 二、开启/confirm/iCallback、ReturnCallback及手动ack配置

spring:
  rabbitmq:
    host: 101.xxx.xx.xx
    port: 5672
    username: admin
    password: admin

    #开启发送到交换机确认callback
    publisher-/confirm/i-type: correlated
    #开启发送到队列失败returnCallback
    publisher-returns: true
    #消息是否强制回退 如果此值为空才取publisher-returns值
    template:
      mandatory: true
    #开启手动确认消息
    listener:
      direct:
        acknowledge-mode: manual
      simple:
        acknowledge-mode: manual

三、新建类继承/confirm/iCallbak、ReturnCallback,具体可见如下demo

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.PostConstruct;
import java.util.UUID;

@Service
@Slf4j
public class SendWithConfirmService implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        if (ack) {
            log.info("消息已发送到交换器 cause:{} - {}" , s , correlationData.toString());
        } else {
            log.info("消息未发送到交换器 cause:{} - {}" , s , correlationData.toString());
        }
    }

    
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        log.info("消息被退回 {}" , message.toString());
    }

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    @Transactional(rollbackFor = Exception.class)
    public void send(String exchange,String routingKey,String data) {
        //注意如果需要confirmCallBack 需要传CorrelationData
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchange,routingKey,data,correlationData);
    }
}

四、手动确认消费消息和未消费消息,channel.basicAck、channel.basicNack使用及参数详解如下

    @RabbitListener(queues = "TestDireQueue3")
    public void receive3(String data, Message message,Channel channel) {
        log.info("TestDireQueue3 receive message================={}",data);
        try {
            //第一个参数是消息的index(可以理解为消息的唯一id)
            //第二个参数是是否开启批量模式,true-一次ack所有消息该消息index的消息,提高效率
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
            
            //未被消费
            //前两个参数和basicAck一样
            //第三个参数为 是否重新回到队列
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            log.info("TestDireQueue3中{}已被消费",data);
        } catch (IOException e) {
            log.error("error",e);
        }
    }

五、需要注意的一些地方及问题

1、如果开启了/confirm/iCallback、returnCallback可一在回调的方法做些额外处理,例如结合数据库进一步保证消息100%投递,或者重发,提示等等操作,但是需要注意的是高并发处理消息的效率会降低

2、配置如果开启了手动ack,所有的消息都需要手动确认,不然消息会一直存在队列中只是状态有所变化为unack,会被不断重新消费,所以一定要调用channel.basicAck方法

3、消费消息不要抛出异常,异常中断也不会正常消费消息,会导致消息死循环一直重复消费

4、小概率出现问题,特别是刚接触时,注意Channel的引用包import com.rabbitmq.client.Channel,因为有很多Channel的包,引错的话会报错org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.nio.channels.Channel] for GenericMessage……,注意查看报错信息也是可以发现的,是因为引用了java.nio.channels.Channel

后续记录创建队列时其它的属性参数设置,例如设置消息的有效持续时间即TTL,死信队列,变相的延迟队列等等

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/683896.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号