栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

消息中间件 RabbitMQ 之 发布高级确认 详解

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

消息中间件 RabbitMQ 之 发布高级确认 详解

8. 发布高级确认

在生产环境中由于一些不明原因,导致RabbitMQ重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复

于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况下,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢?

文章目录
  • 8. 发布高级确认
    • 8.1 发布确认SpringBoot版本
      • 8.1.1 发布确认方案
      • 8.1.2 代码架构图
      • 8.1.3 配置文件
      • 8.1.4 添加配置类
      • 8.1.5 消息生产者
      • 8.1.6 消息消费者
      • 8.1.7 回调接口
      • 8.1.8 结果分析
    • 8.2 回退消息
      • 8.2.1 Mandatory 参数
      • 8.2.2 消息生产者代码
      • 8.2.3 配置类代码
      • 8.2.4 结果分析
    • 8.3 备份交换机
      • 8.3.1 代码架构图
      • 8.3.2 修改配置类
      • 8.3.3 报警消费者代码
      • 8.3.4 测试注意事项
      • 8.3.5 结果分析

8.1 发布确认SpringBoot版本
8.1.1 发布确认方案

  • 当交换机不存在时,生产者发送的消息会直接丢失,当队列不存在时,存放在队列中的消息缓存也会被清除。

  • 所以,交换机和队列只要有一个不在了,那么消息势必就会丢失

具体方案:在生产者发送消息时应当使用缓存将消息存放,并使用定时任务将未成功发送的消息进行重新投递

8.1.2 代码架构图

消息生产者将消息发送给一个直接类型的交换机,一旦交换机接收不到消息,会将消息放进缓存中

8.1.3 配置文件

在配置文件application.properties中需要添加:

#开启发布确认 发布成功后触发回调方法
spring.rabbitmq.publisher-confirm-type=correlated
  • none

    禁用发布确认模式,是默认值

  • correlated

    发布消息成功到交换机后会触发回调方法

  • simple (类似单个确认发布)

    两种效果

    其一效果和correlated模式一样

    其二,在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法,等待broker节点返回发送结果,根据返回结果判断下一步的逻辑

    要注意的点是:waitForConfirmsOrDie方法如果返回false则会关闭channel,接下来无法发送消息到broker

8.1.4 添加配置类
package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ConfirmConfig {

    
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    
    public static final String CONFIRM_ROUTING_KEY = "key1";

    
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

}
8.1.5 消息生产者
package com.example.rabbitmq.controller;

import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        //回调接口内使用
        CorrelationData correlationData = new CorrelationData("1");

        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
        log.info("发送消息内容:{}",message);
    }

}
8.1.6 消息消费者
package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message){
        String msg = new String(message.getBody());
        log.info("接收到的队列confirm.queue消息:{}",msg);
    }

}
8.1.7 回调接口
package com.example.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : " ";
        if (b){
            log.info("交换机已经接收到id为:{}的消息",id);
        }else {
            log.info("交换机还未接收到id为 {} 的消息,由于原因:{}",id,s);
        }
    }

}
8.1.8 结果分析

浏览器发送请求: http://localhost:8080/confirm/sendMsg/hello

可以查看到交换机接收消息的情况,方便后续进行处理

8.2 回退消息
8.2.1 Mandatory 参数
  • 在仅开启了生产者确认消息的情况下,交换机接收到消息后,会直接给消息生产者确认消息,如果发现该消息不可 路由(交换机将消息发给队列),那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的

那么如何让无法被路由的消息帮我想办法处理一下?最起码通知我一声,可以自己处理。通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

在application.properties中添加:

#开启消息回退 路由不出去的消息会回退给生产者
spring.rabbitmq.publisher-returns=true

开启消息回退,路由不到队列中的消息会回退给生产者

8.2.2 消息生产者代码

添加一个错误的 routingKey 的发送消息代码,模拟消息路由到队列失败的情况

package com.example.rabbitmq.controller;

import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        //回调接口内使用
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY, message,correlationData);
        log.info("发送消息内容:{}",message);

        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_ROUTING_KEY+"2", message,correlationData2);
        log.info("发送消息内容:{}",message);
    }

}
8.2.3 配置类代码
package com.example.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;


@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : " ";
        if (b){
            log.info("交换机已经接收到id为:{}的消息",id);
        }else {
            log.info("交换机还未接收到id为 {} 的消息,由于原因:{}",id,s);
        }
    }

    
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息 {},被交换机 {} 退回,退回原因{},路由key:{}",
                returnedMessage.getMessage().getBody(),returnedMessage.getExchange(),
                returnedMessage.getReplyText(),returnedMessage.getRoutingKey());
    }
}

消息消费者代码没有改变

8.2.4 结果分析

浏览器发送请求: http://localhost:8080/confirm/sendMsg/hello

控制台显示结果:

可以看到无法路由到队列中的消息会被回退,不会造成消息的丢失。

8.3 备份交换机
8.3.1 代码架构图

对于消息无法路由到队列的情况,还有另一种解决方法,就是添加备份交换机,在备份交换机后可以添加 备份队列 和 报警队列

需要在代码中添加一个备份交换机、一个备份队列、一个报警队列、一个消费者,并将上面的确认交换机指向备份交换机,将备份交换机和两个队列绑定

8.3.2 修改配置类
package com.example.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class ConfirmConfig {

    
    public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
    
    public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
    
    public static final String CONFIRM_ROUTING_KEY = "key1";
    
    public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
    
    public static final String BACKUP_QUEUE_NAME = "backup.queue";
    
    public static final String WARNING_QUEUE_NAME = "warning.queue";

    
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        Map arguments = new HashMap<>();
        arguments.put("alternate-exchange",BACKUP_EXCHANGE_NAME);
        //确认交换机指向备份交换机  durable:是否持久化
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArguments(arguments).build();
    }

    
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }

    
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange") DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    
    @Bean("backupExchange")
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    
    @Bean("warningQueue")
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }

    
    @Bean
    public Binding backupQueueBinding(@Qualifier("backupQueue") Queue backupQueue,
                                      @Qualifier("backupExchange") FanoutExchange backupExchange){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    
    @Bean
    public Binding waringQueueBinding(@Qualifier("warningQueue") Queue warningQueue,
                                      @Qualifier("backupExchange") FanoutExchange backupExchange){
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }

}
8.3.3 报警消费者代码
package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class WarningConsumer {

    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message){
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}",msg);
    }

}
8.3.4 测试注意事项

由于改变了之前的交换机代码,所以需要先在RabbitMQ后台管理界面删除之前的 确认交换机(confirm.exchange)

8.3.5 结果分析

上面的结果显示:

  • 回退消息 和 备份交换机 可以一起使用,如果两者同时开启,备份交换机的优先级更高
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/842836.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号