栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

SpringBoot集成RabbitMQ实现发布确认

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

SpringBoot集成RabbitMQ实现发布确认

前言

何为发布确认?

字面意思:P发送消息之后,能够确定C收到消息了

问题

P发送消息要通过交换机和队列,所以只需要确保这二者都不出问题,或者有PlanB来防止出意外即可

问题一:交换机出问题

不明原因,导致交换机重启,在重启过程中,生产者投递消息失败,导致消息丢失;

交换机宕机了,没有集群

。。。

问题二:队列出问题

交换机和队列之间的bingKey错误,导致找不到指定队列;

队列TTL已到

。。。

解决方案

交换机问题解决

交换机是否收到消息,生产者是不知道的,所以需要写一个回调方法,通知生产者,消息是否被交换机接收到

队列问题解决

交换机是否找到对应队列,交换机如果找不到对应队列,会将消息丢掉,这种情况是不允许发生的,所以需要把不可达的消息返回交换机,交给其他(备份)交换机重新发送

流程图

实现步骤 声明配置
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    publisher-/confirm/i-type: correlated       #开启发布确认 默认是关闭的
    publisher-returns: true                  #开启消息回退  交换机找不到队列时,会直接丢弃消息,回退给生产者
声明 确定队列,备份队列,警告队列,业务交换机,备份交换机,以及它们之间的绑定关系
@Configuration
public class /confirm/iConfig {

    public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
    public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";

    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    public static final String WARNING_QUEUE_NAME = "warning.queue";
    
    //声明确定队列
    @Bean("confirQueue")
    public Queue queue(){
        return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
    }

    //声明业务交换机
    @Bean("confirExchange")
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(/confirm/i_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
    }

    //绑定确定队列和业务交换机
    @Bean
    public Binding bindingQueueToExchange(@Qualifier("confirQueue") Queue confirQueue,
                                          @Qualifier("confirExchange")DirectExchange confirExchange){
        return BindingBuilder.bind(confirQueue).to(confirExchange).with("key1");
    }

    //声明备份 Exchange
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }
    
    //声明备份队列
    @Bean("backQueue")
    public Queue backQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }
    //声明报警队列
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    //声明备份队列和 备份交换机之间的绑定关系
    @Bean
    public Binding backupBinding(@Qualifier("backQueue") Queue queue,
                                 @Qualifier("backupExchange") FanoutExchange backupExchange){
        return BindingBuilder.bind(queue).to(backupExchange);
    }

    //声明报警队列和 备份交换机之间的绑定关系
    @Bean
    public Binding warningBinding(@Qualifier("warningQueue") Queue queue,
                                  @Qualifier("backupExchange") FanoutExchange
                                          backupExchange){
        return BindingBuilder.bind(queue).to(backupExchange);
    }
}
声明回滚类
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate./confirm/iCallback,RabbitTemplate.ReturnsCallback{
    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String s) {
        String id = correlationData!=null?correlationData.getId():"";
        if (ack){
           log.info("交换机已收到id为:{}",id);
        }else{
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}",id,s);
        }
    }

    
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("消息:{},被交换机:{}退回,退回原因:{},路由key:{}",returnedMessage.getMessage(),returnedMessage.getExchange(),returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
    }
}
声明生产者
@RestController
@RequestMapping("//confirm/i")
@Slf4j
public class /confirm/iProducer {

    public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private MyCallBack myCallBack;

    //依赖注入rabbitMQ之后再设置它的回调对象
    @PostConstruct
    public void init(){
        rabbitTemplate.set/confirm/iCallback(myCallBack);
        rabbitTemplate.setReturnsCallback(myCallBack);
    }

    //发送消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //指定消息id为1
        CorrelationData correlationData1 = new CorrelationData("1");
        String routintKey = "key1";
        rabbitTemplate.convertAndSend(/confirm/i_EXCHANGE_NAME,routintKey,message+routintKey,correlationData1);
        log.info("发送的消息是:{}",message);
        
        //指定消息id为2
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(/confirm/i_EXCHANGE_NAME,routintKey+123,message+routintKey,correlationData2);
        log.info("发送的消息是:{}",message);
    }
}
声明确认消费者
@Component
@Slf4j
public class /confirm/iCustomer {

    public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";

    @RabbitListener(queues = /confirm/i_QUEUE_NAME)
    public void receiveMsg(Message message){
        String msg = new String(message.getBody());
        log.info("接收确定队列传来的消息:{}",msg);
    }
}
声明警告消费者
@Component
@Slf4j
public class WarningConsumer {

    public static final String WARNING_QUEUE_NAME = "warning.queue";

    @RabbitListener(queues = WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) {
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}", msg);
    }
}
启动测试
http://localhost:8080//confirm/i/sendMessage/你爱我,我爱你
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/328039.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号