队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。
RabbitMQ默认的消息确认机制是:自动确认的 。
修改为手动确认模式,然后不手动确认看看结果
在application.yml中
spring:
rabbitmq:
port: 5672
host: 127.0.0.1
username: guest
password: guest
listener:
simple:
prefetch: 1
acknowledge-mode: manual # 开启手动确认,自动是auto
package com.yzm.rabbitmq_02.config;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public static final String ACK_QUEUE = "ack_queue";
@Bean
public Queue queue() {
return QueueBuilder.durable(ACK_QUEUE).build();
}
}
package com.yzm.rabbitmq_02.sender;
import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AckSender {
private final AmqpTemplate template;
public AckSender(AmqpTemplate template) {
this.template = template;
}
@GetMapping("/send")
public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {
for (int i = 1; i <= 10; i++) {
String msg = message + " ..." + i;
System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");
template.convertAndSend(RabbitConfig.ACK_QUEUE, msg);
}
}
}
package com.yzm.rabbitmq_02.receiver;
import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class AckReceiver {
private int count1 = 1;
private int count2 = 1;
@RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive1(Message message) throws InterruptedException {
Thread.sleep(200);
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
}
@RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive2(Message message) throws InterruptedException {
Thread.sleep(1000);
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
}
}
运行结果:
消费者1号、2号分别拿到一条消息进行消费,但没有确认,处于阻塞状态,所以队列不会移除这两条消息,同时设置了prefetch=1,在消费者未确认之前不会重新推送消息给消费者
停止程序,发现2条未确认的消息会回到Ready里面等待重新消费
再次重启,再次消费2条消息,但仍未确认
访问/send,再次发布消息,消息堆积
好了,来看看如何手动确认吧。修改消费者
package com.yzm.rabbitmq_02.receiver;
import com.rabbitmq.client.Channel;
import com.yzm.rabbitmq_02.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class AckReceiver {
private int count1 = 1;
private int count2 = 1;
private int count3 = 1;
@RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive1(
Message message, Channel channel) throws IOException, InterruptedException {
Thread.sleep(200);
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
// 确认消息
// 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
// 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive2(
Message message, Channel channel,
@Headers Map map) throws IOException, InterruptedException {
Thread.sleep(600);
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
// 确认消息
channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
}
@RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive3(
Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
Thread.sleep(1000);
System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);
// 确认消息
channel.basicAck(deliveryTag, false);
}
}
刚启动,就把前两次积累的消息先被消费完
接着发布消息
手动确认通过调用方法实现
basicAck(long deliveryTag, boolean multiple)
deliveryTag:交付标签,相当于消息ID 64位的长整数(从1开始递增)
multiple:false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
能手动确认,同样也可以手动拒绝,修改消费者
@Component
public class AckReceiver {
private int count1 = 1;
private int count2 = 1;
private int count3 = 1;
// @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive1(
Message message, Channel channel) throws IOException, InterruptedException {
Thread.sleep(200);
System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);
// 确认消息
// 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)
// 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
// @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive2(
Message message, Channel channel,
@Headers Map map) throws IOException, InterruptedException {
Thread.sleep(600);
System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);
// 确认消息
channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);
}
// @RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive3(
Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {
Thread.sleep(1000);
System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);
// 确认消息
channel.basicAck(deliveryTag, false);
}
@RabbitListener(queues = RabbitConfig.ACK_QUEUE)
public void receive4(
Message message, Channel channel) throws IOException, InterruptedException {
Thread.sleep(200);
System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'");
System.out.println(" [ 消费者@4号 ] 消息被我拒绝了:" + count1++);
// 拒绝消息方式一
// 第一个参数,交付标签
// 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签
// 第三个参数,false表示直接丢弃消息,true表示重新排队
//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
// 拒绝消息方式二
// 第一个参数,交付标签
// 第二个参数,false表示直接丢弃消息,true表示重新排队
// 跟basicNack的区别就是始终只拒绝提供的交付标签
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
运行结果:
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
这里是拒绝后,重新进入队列,所以消费的总是第一条消息并且循环不停
停止程序后,队列仍然是10条消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
改成false,拒绝后直接丢弃
重启后:
相关链接总结一下 手动确认模式的各种情况
未确认:什么也不用写,消息不会移除,重复消费,积攒越来越多
确认:channel.basicAck();确认后,消息从队列中移除
拒绝:channel.basicNack()或channel.basicReject();拒绝后,消息先从队列中移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)
首页
上一篇:快速入门
下一篇:交换机



