栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Rabbitmq的消息确认机制(十一)

Rabbitmq的消息确认机制(十一)

一、前言

RabbitMq提供了消息确认机制,主要分为生产者端发送消息确认和消费者端的消费消息确认。
1、生产者端发送消息确认又分为Confirm 消息确认机制和Return 消息机制

2、消费者端消息接收确认采用的是ack模式

  • AcknowledgeMode.NONE :自动确认

  • AcknowledgeMode.AUTO:根据情况确认
    如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认

    当抛出 AmqpRejectAndDontRequeueException 异常的时候,则消息会被拒绝,且 requeue = false(不重新入队列)

    当抛出 ImmediateAcknowledgeAmqpException 异常,则消费者会被确认

    其他的异常,则消息会被拒绝,且 requeue = true(如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的)。可以通过 setDefaultRequeueRejected(默认是true)去设置

  • AcknowledgeMode.MANUAL:手动确认

二、生产者端消息确认机制 2.1 引入 rabbitmq 依赖


    4.0.0

    org.example
    springboot-rabbitmq-fanout-producer
    1.0-SNAPSHOT

    
        8
        8
    

    
        
            
                org.springframework.boot
                spring-boot-dependencies
                2.3.2.RELEASE
                pom
                import
            
        
    

    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-test
        
        
            org.projectlombok
            lombok
            1.18.20
        
    

2.2 application.yml
# 服务端口
server:
  port: 8080
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /

    #由于开启了集群,不再建议使用此方式配置服务地址,若集群含有多个IP地址,不方便指定
#    host: 192.168.229.128
#    port: 5672
    #集群地址配置:指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
    addresses: 192.168.229.128:5672
    # 开启发送确认(有些博文写的是publisher-/confirm/is: true,但其实此方式已经被遗弃,替换为publisher-/confirm/i-type)
    publisher-/confirm/i-type: correlated
#    # 开启发送失败退回(例如:消息有没有找到合适的队列)
    publisher-returns: true

springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-/confirm/i-type = correlated 实现相同效果

在springboot2.2.0.RELEASE版本之前是amqp正式支持的属性,用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-/confirm/i-type属性配置代替,用来配置更多的确认类型:

  • NONE值是禁用发布确认模式,是默认值

  • CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例

  • SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用,waitFor/confirm/is或waitFor/confirm/isOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitFor/confirm/isOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

2.3 Exchange 和 Queue
package com.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {

    //创建队列
    @Bean
    public Queue directEmailQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("email.direct.queue", true);
    }
    @Bean
    public Queue directSmsQueue() {
        return new Queue("sms.direct.queue", true);
    }
    @Bean
    public Queue directWeixinQueue() {
        return new Queue("weixin.direct.queue", true);
    }


    //创建交换机
    @Bean
    public DirectExchange directOrderExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }


    //绑定关系
    @Bean
    public Binding directEmailBinding() {
        return BindingBuilder.bind(directEmailQueue()).to(directOrderExchange()).with("email");
    }
    @Bean
    public Binding directSmsBinding() {
        return BindingBuilder.bind(directSmsQueue()).to(directOrderExchange()).with("sms");
    }
    @Bean
    public Binding directWeixinBinding() {
        return BindingBuilder.bind(directWeixinQueue()).to(directOrderExchange()).with("weixin");
    }
}

2.4 消息发送确认

发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。

消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。(无论成功失败都有返回)

消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。(失败时才会有返回)

我们可以利用这两个Callback来确保消的100%送达。

2.4.1 /confirm/iCallback确认模式

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

package com.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class /confirm/iCallbackService implements RabbitTemplate./confirm/iCallback {
    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        log.info("消息标识:" + correlationData.toString());
        log.info("发送成功确认:"+ack);
        log.info("错误原因:"+cause);
    }
}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback 。

2.4.2 ReturnCallback 退回模式

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

package com.callback;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("消息:"+message.toString());
        log.info("返回码:"+replyCode);
        log.info("返回描述:"+replyText);
        log.info("交换机:"+exchange);
        log.info("路由key:"+routingKey);
    }
}

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

2.4.3 测试
    @Test
    public void contextLoads7() throws Exception {
        myService.sendMessage("direct_order_exchange","email","我是发送者");

        Thread.sleep(5000);
    }

为何要沉睡5秒呢?因为使用这种单元测试方式,程序一运行完就会立即关闭应用,而回调函数的执行会有延迟,故为了保证能收到消息确认而设置了沉睡一下再关闭程序。

成功测试:

失败测试:

三、消费者端消息确认机制 3.1 引入依赖

3.2 application.yml
# 服务端口
server:
  port: 8081
# 配置rabbitmq服务
spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    addresses: 192.168.229.128:5672
#    host: 192.168.229.128
#    port: 5672

3.3 Exchange 和 Queue

若生产者端未配置,则需要配置;这里生产者已经配置了,故这里无需配置。

3.4 消息接收确认 3.4.1 默认ack

代码:

package com.service.direct;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailService {
    @RabbitHandler
    public void messagerevice(String msg, Channel channel, Message message){
        // 此处省略发邮件的逻辑
        System.out.println("email-------------->" + message);
//        int i = 10/0;
    }
}

rabbitmq默认的是自动ack,无需添加其他配置。

特性:
消费者自行监控队列,若有队列存在未消费的消息,则进行消费。
若正常消费成功了,则会自动返回确认ack给队列,队列收到后即可将消息移除。
若消费过程中出现异常,则会触发重试消费,若一直报错则会出现死循环。

问题:使用自动ack时,如何解决出现死循环的情况?
方案一:控制重试次数
修改配置文件

spring:
  rabbitmq:
    username: guest
    password: guest
    virtual-host: /
    addresses: 192.168.229.128:5672
#    host: 192.168.229.128
#    port: 5672

    listener:
      simple:
#        acknowledge-mode: manual # 设置消费端手动 ack
        retry:
          enabled: true # 是否支持重试
          max-attempts: 10 #最大重试次数
          initial-interval: 2000ms #重试时间间隔

测试后,可发现重试了10次就停止了,但队列的消息也被移除掉了,会造成消息丢失。

方案二:控制重试次数+死信队列
加上死信队列,可以实现重试次数结束后,队列会将消息转移到死信队列,从而不会造成消息丢失。

3.4.2 手动ack

开启方式简单,只需要放开此配置即可

acknowledge-mode: manual # 设置消费端手动 ack

消费消息有三种回执方法,我们来分析一下每种方法的含义。

1、basicAck
basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

2、basicNack
basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

3、basicReject
basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

3.4.3 测试
package com.service.direct;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.io.IOException;

// bindings其实就是用来确定队列和交换机绑定关系
@RabbitListener(queues = {"email.direct.queue"})
@Component
public class DirectEmailService {
    @RabbitHandler
    public void messagerevice(String msg, Channel channel, Message message) throws IOException {
        try {
            // 此处省略发邮件的逻辑
            System.out.println("email-------------->" + message);
            int i = 10/0;

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){

            //注意:参数三若设置为true,会出现死循环
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
    }
}

注意:
1、basicNack方法的参数三设置为true会造成死循环,true是允许重试。
2、使用了手动ack后,重试次数将不起作用
3、basicNack方法执行后,消息会被移除,若存在死信队列则转移到死信队列,反之则造成消息丢失。

四、参考

https://blog.csdn.net/xinzhifu1/article/details/107016179

https://www.cnblogs.com/biehongli/p/11789098.html

https://www.cnblogs.com/haixiang/p/10900005.html#%E7%94%9F%E4%BA%A7%E7%AB%AF-/confirm/i-%E6%B6%88%E6%81%AF%E7%A1%AE%E8%AE%A4%E6%9C%BA%E5%88%B6

https://blog.csdn.net/dh554112075/article/details/90137869

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/327141.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号