Consumer acknowledgements and Publisher Confirms大家好,我是河海哥,专注于后端,如果可以的话,想做一名code designer而不是普通的coder,一起见证河海哥的成长,您的评论与赞是我的最大动力。
今天主要整理下,消费端确认和发布者确认,好多blog把consumer确认和publisher确认混起来了。。就挺尴尬的。
1:consumer acknowledgement 1-1:consumer确认的目的在哪?由于网络、程序出错、等原因,从rabbitmq 节点 deliver进conusmer的时候,可能会出现问题,比如网络波动了,消费者都没收到这个消息;比如,代码跑到一半应用挂了;此时message就会丢失,导致message无法正确被消费。那么此时,就得有一个mechanism,来确保message正确被投递进consumer中,并且成功被消费。
1-2:两种确认模式
- 自动确认模式(automatic acknowledgement model):当RabbbitMQ将消息发送给应用后,消费者端默认回送一个确认消息,此时RabbitMQ删除此消息。
- 手动确认模式(explicit acknowledgement model):消费者收到消息后,可以在执行一些逻辑后,消费者自己决定什么时候发送确认回执(acknowledgement),RabbitMQ收到回执后才删除消息,这样就保证消费端不会丢失消息
https://juejin.cn/post/6844903551886426120
很明显,自动确认模式基本上不用,因为默认消费者是完成消费的,他不会管消费者是否真的完成了。一般选择第二种手动确认。
手动确认主要有三个方法。multiple,是批量的记号,比如tag = 8,那么8之前的所有都会确认。一般不用这个,因为你怎么能替别人做主?requeue代表reject了 message是否重新入队。
void basicAck(long deliveryTag, boolean multiple) throws IOException; void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; void basicReject(long deliveryTag, boolean requeue) throws IOException;1-3:delivery tag
rabbitmq 用delivery tag对message进行确认编号,当节点收到了consumer端的应答,就可以将message进行丢弃处理。所以说,deliver tag和publisher是没有任何关系的。
- delivery tag 是一个auto increment 值,来一个消息就加1。
- delivery tag 作用范围是每一个channel的。
后面会进行验证。
1-4:springboot demo这里使用Direct exchange,同时绑定一个队列,以及两个consumer,主要是测试不同channel的delivery tag是单独的。
- RabbitmqConfig.java
package com.example.springbootrabbitmq.configuration;
import com.example.springbootrabbitmq.My/confirm/iCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitmqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
//自动装配消息监听器所在的容器工厂配置类实例
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
// @Bean
// public MessageConverter jsonMessageConverter() {
// return new Jackson2JsonMessageConverter();
// }
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.set/confirm/iCallback(new My/confirm/iCallback());
// rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
// if (ack) {
log.info("发送成功");
// } else {
correlationData.getReturned().getMessage().getMessageProperties().getMessageId();
log.info("发送失败");
// }
// });
return rabbitTemplate;
}
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer() {
//定义消息监听器所在的容器工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置容器工厂所用的实例
factory.setConnectionFactory(connectionFactory);
//设置消息在传输中的格式,在这里采用JSON的格式进行传输
factory.setMessageConverter(new Jackson2JsonMessageConverter());
// //设置并发消费者实例的初始数量。在这里为1个
// factory.setConcurrentConsumers(1);
// //设置并发消费者实例的最大数量。在这里为1个
// factory.setMaxConcurrentConsumers(1);
// //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
// factory.setPrefetchCount(1);
// 关闭自动应答
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置不公平分发,更改为每次读取1条消息,在消费者未回执确认之前,不在进行下一条消息的投送,而不是默认的轮询;
// 也就是说,我处理完了,我再接受下一次的投递,属于消费者端的控制
// 不设置的话,就是采用轮询的方法去监听队列,你一条我一条
factory.setPrefetchCount(1);
return factory;
}
}
- DirectRabbitConfig.java,配置了两个队列,其实只用到了一个。
@Configuration
public class DirectRabbitConfig {
//队列 起名:springDirectQueue
@Bean
public Queue springDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,不然你电脑关机的时候队列就会消失。
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("springDirectQueue", true);
}
@Bean
public Queue springDirectQueue2() {
return new Queue("springDirectQueue2", true);
}
//Direct交换机 起名:springDirectExchange
@Bean
public DirectExchange TestDirectExchange() {
return new DirectExchange("springDirectExchange", true, false);
}
//用两个队列,将队列和交换机绑定, 并设置用于匹配键:springDirectRouting1, springDirectRouting2,用来实验routingKey的作用
@Bean
public Binding bindingDirectQueue1() {
return BindingBuilder.bind(springDirectQueue()).to(TestDirectExchange()).with("springDirectRouting1");
}
@Bean
public Binding bindingDirectQueue2() {
return BindingBuilder.bind(springDirectQueue2()).to(TestDirectExchange()).with("springDirectRouting2");
}
}
- DirectProducerController.java
package com.example.springbootrabbitmq.controller;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
@RestController
@RequestMapping("/directProducer")
@Slf4j
public class DirectProducerController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage")
public void sendMessage(String msg) {
Map msgMap = new HashMap<>();
msgMap.put("message", msg);
String messageJson = JSONObject.toJSONString(msgMap);
Message message = MessageBuilder
.withBody(messageJson.getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_JSON)
.setContentEncoding("utf-8")
.setMessageId(UUID.randomUUID() + "")
.build();
log.info("生产者发送:" + new String(message.getBody(), StandardCharsets.UTF_8));
CorrelationData correlationData = new CorrelationData();
correlationData.setId("1");
rabbitTemplate.convertAndSend("springDirectExchange", "springDirectRouting1", message, correlationData);
}
}
- DirectConsumerController.java
package com.example.springbootrabbitmq.controller;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
@RestController
@RequestMapping("/directConsumer")
@Slf4j
public class DirectConsumerController {
@RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
public void processMessageQueue(Message message, Channel channel) throws Exception {
int channelNumber = channel.getChannelNumber();
log.info("channelNumber为:{}", channelNumber);
String messageString = new String(message.getBody(), StandardCharsets.UTF_8);
String messageId = message.getMessageProperties().getMessageId();
log.info("springDirectQueue_consumer1消费者接受:" + messageString);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("deliveryTag:" + deliveryTag);
channel.basicAck(deliveryTag, false);
}
@RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
public void processMessageQueue2(Message message, Channel channel) throws Exception {
int channelNumber = channel.getChannelNumber();
log.info("channelNumber为:{}", channelNumber);
String messageString = new String(message.getBody(), StandardCharsets.UTF_8);
String messageId = message.getMessageProperties().getMessageId();
log.info("springDirectQueue_consumer2消费者接受:" + messageString);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
log.info("deliveryTag:" + deliveryTag);
channel.basicAck(deliveryTag, false);
}
}
-
ouput
2021-10-22 16:29:39.566 INFO 11264 --- [nio-8021-exec-2] c.e.s.c.DirectProducerController : 生产者发送:{"message":"测试directExchange"} 2021-10-22 16:29:39.585 INFO 11264 --- [nectionFactory1] c.e.s.My/confirm/iCallback : 确认的消息id为:1 2021-10-22 16:29:39.620 INFO 11264 --- [ntContainer#7-1] c.e.s.c.DirectConsumerController : channelNumber为:12 2021-10-22 16:29:39.622 INFO 11264 --- [ntContainer#7-1] c.e.s.c.DirectConsumerController : springDirectQueue_consumer1消费者接受:{"message":"测试directExchange"} 2021-10-22 16:29:39.622 INFO 11264 --- [ntContainer#7-1] c.e.s.c.DirectConsumerController : deliveryTag:1 2021-10-22 16:29:39.626 INFO 11264 --- [ntContainer#8-1] c.e.s.c.DirectConsumerController : channelNumber为:13 2021-10-22 16:29:39.626 INFO 11264 --- [ntContainer#8-1] c.e.s.c.DirectConsumerController : springDirectQueue_consumer2消费者接受:{"message":"测试directExchange"} 2021-10-22 16:29:39.626 INFO 11264 --- [ntContainer#8-1] c.e.s.c.DirectConsumerController : deliveryTag:1首先conusmer1接收到这个消息,channelNumber是12,deliveryTag默认从1开始,所以是1,之后因为reject,requeue=true,重新进入队列,被consumer2消费,consumer2的channelNumber13,deliveryTag也是1。说明channel不同,deliveryTag也是不同的。
-
如果忘记了ack,会是什么结果。
RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。 以前测试过,需要等待很久消息才会被投递到另一个consumer中。可以试试,那么这个问题怎么解决呢?这个就是queue出现了大量unack,我还没接触到 = =||,我也不晓得,哈哈哈哈。
发布确认,主要是发送方确定message 已经成功deliver 进broker 中,也就是publisher和broker之间的/confirm/i。
Using standard AMQP 0-9-1, the only way to guarantee that a message isn’t lost is by using transactions – make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced. It mimics the consumer acknowledgements mechanism already present in the protocol.
官网的这段话解释了,发布确认的两种途径,一种是事务,一种是acknowledge mechanism。事务是重量级的并且会减少吞吐量,所以更倾向于confim mechanism,从官网的理解来看,似乎client和broker之间也是通过类似于consumer端确认的方法,broker 通过ack,nack方法来通过deliveryTag来保证消息是否已经投递到broker中,可能是我们专注于producer和consumer看不到这层。(是否真的如此待考证,这是我的猜测)
2-2:什么时候broker进行消息确认发送方消息确认主要有两个部分构成
-
发送方到broker中的exchange。
-
broker内部exchange,把消息投递到queue中。
-
Broker对于没有成功从exchange投递到指定的队列的时候,会发送一个/confirm/i。如果配置了消息会退,会将消息退回到客户端。
-
对于成功路由到指定队列的时候,会发送一个ack,如果队列持久化,会在持久化成功之后,再回复一个确认消息。
由这两点可以看出,第一种只要消息成功到了exchange上就代表确认了。第二种是在持久化之后/confirm/i。强调一点要保证消息持久化,rabbitmq一定要启用/confirm/i,不然会丢失。没有/confirm/i,无法确认message是否真的投递成功。
2-3:/confirm/i的模式 2-3-1:springboot中basic.ack for a persistent message routed to a durable queue will be sent after persisting the message to disk
publisher-/confirm/i-type
- NONE值是禁用发布确认模式,是默认值
- CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
- SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitFor/confirm/is或waitFor/confirm/isOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitFor/confirm/isOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
springboot中一般就配置CORRELATED,即可,使用的是异步确认。
2-3-2:原生rabbitmq-
串行/confirm/i模式:producer每发送一条消息后,调用waitForConfirms()方法,等待broker端/confirm/i,如果服务器端返回false或者在超时时间内未返回,客户端进行消息重传。
-
批量/confirm/i模式:producer每发送一批消息后,调用waitForConfirms()方法,等待broker端/confirm/i。
-
异步/confirm/i模式:提供一个回调方法,broker /confirm/i了一条或者多条消息后producer端会回调这个方法。
事实证明确实是异步确认效率更高。
springboot中,主要靠publisher-/confirm/i-type,publisher-returns,mandatory 这三个参数来实现,我们通过demo来加深认识。
- RabbitmqConfig
package com.xx;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@Slf4j
public class RabbitmqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.set/confirm/iCallback(new My/confirm/iCallback());
rabbitTemplate.setReturnsCallback(new MyReturnCallback());
rabbitTemplate.setMandatory(true);
return rabbitTemplate;
}
}
-
DirectRabbitConfig.java
package com.xx; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectRabbitConfig { //队列 起名:springDirectQueue @Bean public Queue springDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,不然你电脑关机的时候队列就会消失。 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("test_queue_1", true); } //Direct交换机 起名:springDirectExchange @Bean public DirectExchange TestDirectExchange() { return new DirectExchange("test_exchange_1", true, false); } //用两个队列,将队列和交换机绑定, 并设置用于匹配键:springDirectRouting1, springDirectRouting2,用来实验routingKey的作用 @Bean public Binding bindingDirectQueue1() { return BindingBuilder.bind(springDirectQueue()).to(TestDirectExchange()).with("testRouting1"); } } -
DirectProducerController
package com.xx; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.UUID; @RestController @RequestMapping("/directProducer") @Slf4j public class DirectProducerController { @Resource private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessageByExchangeAndRoutingKey") public void sendMessageByExchangeAndRoutingKey(String msg, String exchange, String routingKey) { MapmsgMap = new HashMap<>(); msgMap.put("message", msg); String messageJson = JSONObject.toJSONString(msgMap); Message message = MessageBuilder .withBody(messageJson.getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_JSON) .setContentEncoding("utf-8") .setMessageId(UUID.randomUUID() + "") .build(); log.info("生产者发送:" + new String(message.getBody(), StandardCharsets.UTF_8)); rabbitTemplate.convertAndSend(exchange, routingKey, message); } } -
DirectConsumer
package com.xx; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @Slf4j @Component public class DirectConsumer { @RabbitListener(queues = "test_queue_1") public void processMessageQueue(Message message, Channel channel) throws Exception { String messageString = new String(message.getBody(), StandardCharsets.UTF_8); log.info("--------springDirectQueue_consumer1接受:{} --------", messageString); int channelNumber = channel.getChannelNumber(); log.info("--------springDirectQueue_consumer1-channelNumber为:{}", channelNumber); long deliveryTag = message.getMessageProperties().getDeliveryTag(); log.info("--------springDirectQueue_consumer1-deliveryTag:" + deliveryTag); TimeUnit.SECONDS.sleep(3); channel.basicAck(deliveryTag, false); // channel.basicNack(deliveryTag, false, true); log.info("--------springDirectQueue_consumer1处理结束--------"); } } -
My/confirm/iCallback
package com.xx; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class My/confirm/iCallback implements RabbitTemplate./confirm/iCallback { @Override public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) { // if (correlationData != null) { // String id = correlationData.getId(); // log.info("确认的消息id为:" + id); // } if (ack) { log.info("ack from broker."); } else { log.info("nack from broker. The reason is {}", cause); } } } -
MyReturnCallback
package com.xx; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; @Slf4j @Component public class MyReturnCallback implements RabbitTemplate.ReturnsCallback { @Override public void returnedMessage(ReturnedMessage returned) { log.info("Haven't reach target queue, received message {}", returned.getMessage().toString()); } }
request:
http://localhost:8022/directProducer/sendMessageByExchangeAndRoutingKey?msg=测试directExchange&exchange=test_exchange_1&routingKey=testRouting1
-
首先我们发送一条正常的消息:
2021-10-27 21:21:54.590 INFO 59556 --- [nio-8022-exec-3] com.xx.DirectProducerController : 生产者发送:{"message":"测试directExchange"} 2021-10-27 21:21:54.601 INFO 59556 --- [nectionFactory2] com.xx.My/confirm/iCallback : ack from broker. 2021-10-27 21:21:54.614 INFO 59556 --- [ntContainer#0-1] com.xx.DirectConsumer : --------springDirectQueue_consumer1接受:{"message":"测试directExchange"} -------- 2021-10-27 21:21:54.614 INFO 59556 --- [ntContainer#0-1] com.xx.DirectConsumer : --------springDirectQueue_consumer1-channelNumber为:1 2021-10-27 21:21:54.614 INFO 59556 --- [ntContainer#0-1] com.xx.DirectConsumer : --------springDirectQueue_consumer1-deliveryTag:1 2021-10-27 21:21:57.617 INFO 59556 --- [ntContainer#0-1] com.xx.DirectConsumer : --------springDirectQueue_consumer1处理结束--------
进入了/confirm/i回调接口,说明message投递到broker是正常的。
-
如果没有相匹配的队列,这个时候回调接口,就会有用。publisher-returns,会将没有匹配到具体队列的消息,返回给client端。我们再发送一个routingkey不一致,匹配不到队列的情况。
2021-10-27 21:15:03.362 INFO 57786 --- [nio-8022-exec-9] com.xx.DirectProducerController : 生产者发送:{"message":"测试directExchange"} 2021-10-27 21:15:03.364 INFO 57786 --- [nectionFactory4] com.xx.MyReturnCallback : Haven't reach target queue, received message (Body:'{"message":"测试directExchange"}' MessageProperties [headers={}, messageId=c738202c-7d4c-4c79-b06e-6240f26a6979, contentType=application/json, contentEncoding=utf-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]) 2021-10-27 21:15:03.365 INFO 57786 --- [nectionFactory5] com.xx.My/confirm/iCallback : Publisher: Message has been confirmed by broker.正如之前所说,没有被成功投递到队列,但是已经进入了相应的exchange,也会被broker comfirm并且是ack。同时,消息退回在没有成功投递到队列的时候,触发publisher-returns 回调接口。
-
如果连exchange都没有成功呢?我们修改exchange。
2021-10-27 21:21:32.677 INFO 59556 --- [nio-8022-exec-1] com.xx.DirectProducerController : 生产者发送:{"message":"测试directExchange"} 2021-10-27 21:21:32.692 ERROR 59556 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'test_exchange1' in vhost '/', class-id=60, method-id=40) 2021-10-27 21:21:32.694 INFO 59556 --- [nectionFactory2] com.xx.My/confirm/iCallback : nack from broker. The reason is channel error; protocol method: #method (reply-code=404, reply-text=NOT_FOUND - no exchange 'test_exchange1' in vhost '/', class-id=60, method-id=40) 直接被broker nack了,走的是/confirm/i回调接口。但是无法再次拿到这个message,发送的时候可以用CorrelationData 拿回消息。
参考:https://blog.csdn.net/yaomingyang/article/details/106857104
3:alternative exchange 备份交换机上述提到,当消息没有递送到queue的时候,可以通过mandatory、publisher-return 这两个参数将message回退到client端,我们还可以通过alternative exchange进行兜底工作,这样由于routingkey匹配不上而没有deliver进queue的message,就可以投递到备份交换机中,进行消费。
3-1:springboot-测试demo我们接着上面的代码,加入备份交换机。
-
AlternativeExchangeConfig
package com.xx; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class AlternativeExchangeConfig { public static final String ALTERNATIVE_EXCHANGE_NAME = "alternative_exchange_name"; public static final String ALTERNATIVE_QUEUE_NAME = "alternative_queue_name"; public static final String ALTERNATIVE_ROUTING_KEY = "alternative_routing_key"; @Bean("alternativeQueue") public Queue alternativeQueue() { return new Queue(ALTERNATIVE_QUEUE_NAME, true, false, false); } @Bean("alternativeExchange") public FanoutExchange alternativeExchange() { return new FanoutExchange(ALTERNATIVE_EXCHANGE_NAME, true, false); } @Bean public Binding bindingAlternativeQueue(@Qualifier("alternativeQueue") Queue queue, @Qualifier("alternativeExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } } -
在原excahnge中声明备份交换机。
@Bean public DirectExchange TestDirectExchange() { Maparguments = new HashMap<>(); //如果设置此参数,TestAE交换器绑定的路由没有匹配到就会发送到exchange-unroute交换器所绑定的队列进行消费 arguments.put("alternate-exchange", AlternativeExchangeConfig.ALTERNATIVE_EXCHANGE_NAME); return new DirectExchange("test_exchange_1", true, false, arguments); } -
备份交换机的监听者
package com.xx; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @Component @Slf4j public class AlternativeConsumer { @RabbitListener(queues = AlternativeExchangeConfig.ALTERNATIVE_QUEUE_NAME) public void alternativeProcess(Message message, Channel channel) throws Exception { String messageString = new String(message.getBody(), StandardCharsets.UTF_8); log.info("--------alternative_consumer1接受:{} --------", messageString); int channelNumber = channel.getChannelNumber(); log.info("--------alternative_consumer1-channelNumber为:{}", channelNumber); long deliveryTag = message.getMessageProperties().getDeliveryTag(); log.info("--------alternative_consumer1-deliveryTag:" + deliveryTag); TimeUnit.SECONDS.sleep(3); channel.basicAck(deliveryTag, false); // channel.basicNack(deliveryTag, false, true); log.info("--------alternative_consumer1处理结束--------"); } } -
测试:记得更改配置要先删除原来的,不然不能成功。
这是原来走的消息回退的接口。
2021-10-28 14:53:54.005 INFO 2801 --- [nio-8022-exec-2] com.xx.DirectProducerController : 生产者发送:{"message":"测试directExchange"} 2021-10-28 14:53:54.018 INFO 2801 --- [nectionFactory2] com.xx.My/confirm/iCallback : ack from broker. 2021-10-28 14:53:54.017 INFO 2801 --- [nectionFactory1] com.xx.MyReturnCallback : Haven't reach target queue, received message (Body:'{"message":"测试directExchange"}' MessageProperties [headers={}, messageId=b2d7d943-e309-4c17-b5d5-2c3b14c5c376, contentType=application/json, contentEncoding=utf-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])这是现在走备份交换机的输出。
2021-10-28 15:11:23.106 INFO 3046 --- [nio-8022-exec-2] com.xx.DirectProducerController : 生产者发送:{"message":"测试directExchange"} 2021-10-28 15:11:23.118 INFO 3046 --- [nectionFactory1] com.xx.My/confirm/iCallback : ack from broker. 2021-10-28 15:11:23.122 INFO 3046 --- [ntContainer#0-1] com.xx.AlternativeConsumer : --------alternative_consumer1接受:{"message":"测试directExchange"} -------- 2021-10-28 15:11:23.123 INFO 3046 --- [ntContainer#0-1] com.xx.AlternativeConsumer : --------alternative_consumer1-channelNumber为:1 2021-10-28 15:11:23.123 INFO 3046 --- [ntContainer#0-1] com.xx.AlternativeConsumer : --------alternative_consumer1-deliveryTag:1 2021-10-28 15:11:26.124 INFO 3046 --- [ntContainer#0-1] com.xx.AlternativeConsumer : --------alternative_consumer1处理结束--------可以清楚的看到,原来的消息回退设置直接被覆盖了,直接走的交换机,这个也是要留意一下,会进行覆盖的。
为什么我们要费这么大力气去研究consumer 和puhlisher两端的确认机制,其实这都是防止消息丢失当中的一环,消息丢失一定会是一个极其重要的问题,得好好研究下,下一篇专门分析下消息丢失的过程。河海哥,加油!! 若是有遗漏或者错误部分,还请您指正,非常感谢了!共同学习进步。



