栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ之消息确认机制ACK

RabbitMQ之消息确认机制ACK

消息确认机制(ack)

队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。
RabbitMQ默认的消息确认机制是:自动确认的 。

修改为手动确认模式,然后不手动确认看看结果
在application.yml中

spring:
  rabbitmq:
    port: 5672
    host: 127.0.0.1
    username: guest
    password: guest
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: manual # 开启手动确认,自动是auto
package com.yzm.rabbitmq_02.config;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    public static final String ACK_QUEUE = "ack_queue";

    
    @Bean
    public Queue queue() {
        return QueueBuilder.durable(ACK_QUEUE).build();
    }
}
package com.yzm.rabbitmq_02.sender;

import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class AckSender {

    private final AmqpTemplate template;

    public AckSender(AmqpTemplate template) {
        this.template = template;
    }

    @GetMapping("/send")
    public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
        for (int i = 1; i <= 10; i++) {
            String msg = message + " ..." + i;
            System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
            template.convertAndSend(RabbitConfig.ACK_QUEUE, msg);
        }
    }
}
package com.yzm.rabbitmq_02.receiver;

import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class AckReceiver {

    private int count1 = 1;
    private int count2 = 1;

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive1(Message message) throws InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
    }

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive2(Message message) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
    }
}

运行结果:

消费者1号、2号分别拿到一条消息进行消费,但没有确认,处于阻塞状态,所以队列不会移除这两条消息,同时设置了prefetch=1,在消费者未确认之前不会重新推送消息给消费者

停止程序,发现2条未确认的消息会回到Ready里面等待重新消费

再次重启,再次消费2条消息,但仍未确认

访问/send,再次发布消息,消息堆积

好了,来看看如何手动确认吧。修改消费者

package com.yzm.rabbitmq_02.receiver;

import com.rabbitmq.client.Channel;
import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;


@Component
public class AckReceiver {

    private int count1 = 1;
    private int count2 = 1;
    private int count3 = 1;

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive1(
            Message message, Channel channel) throws IOException, InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);

        // 确认消息
        // 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
        // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive2(
            Message message, Channel channel,
            @Headers Map map) throws IOException, InterruptedException {
        Thread.sleep(600);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);

        // 确认消息
        channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
    }

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive3(
            Message message, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);

        // 确认消息
        channel.basicAck(deliveryTag, false);
    }
}

刚启动,就把前两次积累的消息先被消费完

接着发布消息

手动确认通过调用方法实现
basicAck(long deliveryTag, boolean multiple)
deliveryTag:交付标签,相当于消息ID 64位的长整数(从1开始递增)
multiple:false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签

能手动确认,同样也可以手动拒绝,修改消费者

@Component
public class AckReceiver {

    private int count1 = 1;
    private int count2 = 1;
    private int count3 = 1;

//    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive1(
            Message message, Channel channel) throws IOException, InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);

        // 确认消息
        // 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
        // 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

//    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive2(
            Message message, Channel channel,
            @Headers Map map) throws IOException, InterruptedException {
        Thread.sleep(600);
        System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);

        // 确认消息
        channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
    }

//    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive3(
            Message message, Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
        Thread.sleep(1000);
        System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);

        // 确认消息
        channel.basicAck(deliveryTag, false);
    }

    @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
    public void receive4(
            Message message, Channel channel) throws IOException, InterruptedException {
        Thread.sleep(200);
        System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'");
        System.out.println(" [ 消费者@4号 ] 消息被我拒绝了:" + count1++);
        
        // 拒绝消息方式一
        // 第一个参数,交付标签
        // 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签
        // 第三个参数,false表示直接丢弃消息,true表示重新排队
        //channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

        // 拒绝消息方式二
        // 第一个参数,交付标签
        // 第二个参数,false表示直接丢弃消息,true表示重新排队
        // 跟basicNack的区别就是始终只拒绝提供的交付标签
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
    }
}

运行结果:

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
这里是拒绝后,重新进入队列,所以消费的总是第一条消息并且循环不停
停止程序后,队列仍然是10条消息

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
改成false,拒绝后直接丢弃
重启后:

总结一下 手动确认模式的各种情况
未确认:什么也不用写,消息不会移除,重复消费,积攒越来越多
确认:channel.basicAck();确认后,消息从队列中移除
拒绝:channel.basicNack()或channel.basicReject();拒绝后,消息先从队列中移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)

相关链接

首页
上一篇:快速入门
下一篇:交换机

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/600272.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号