1.生成者不知道消息是否真正到达broker(/confirm/i模式)
(1)普通/confirm/i模式:同步确认发布,publish一条消息后,等待服务器端/confirm/i,如果服务端返回false或者超时时间内未返回,客户端进行消息重传
channel./confirm/iSelect();//开启发布确认
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
if(!channel.waitFor/confirm/is()){
System.out.println("send message failed.");
}
(2)批量/confirm/i模式:同步确认发布,每发送一批消息后,调用waitFor/confirm/is()方法,等待服务器端confirm
channel./confirm/iSelect();
for(int i=0;i outstandingConfirms = new ConcurrentSkipListMap<>();
/confirm/iCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(sequenceNumber, true);
//清除该部分未确认消息
/confirm/ied.clear();
} else {
//只清除当前序列号的消息
outstanding/confirm/is.remove(sequenceNumber);
}
};
/confirm/iCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstanding/confirm/is.get(sequenceNumber);
System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
};
channel.add/confirm/iListener(ackCallback, nackCallback);
long begin = System.currentTimeMillis();
for (int i = 0; i < 1000; i++) {
String message = "message:" + i;
outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + 1000 + "个异步确认消息,耗时" + (end - begin) + "ms");
}
}
2.保证消息从队列可靠地到达消费者(关闭自动消息确认,进行手动ack),
public class Consumer {
private final static String QUEUE_NAME = "TEST_QUEUE";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
//手动ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
};
System.out.println("Consumer等待消费......");
//关闭自动消息确认 autoAck = false;
channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
}
}
发布确认高级:
1.投递到交换机失败,投递失败后应该通知消息生产者(确认回调)
2.交换机投递成功但是消息路由到队列失败(回退回调)
springboot版本:
(1)配置文件
spring.rabbitmq.publisher-/confirm/i-type=correlated
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换器后会触发回调方法
SIMPLE:略
spring.rabbitmq.host=192.168.196.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-/confirm/i-type=correlated
(2)交换机、队列配置类
@Configuration
public class /confirm/iConfig {
public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
//声明交换机
@Bean("/confirm/iExchange")
public DirectExchange /confirm/iExchange() {
return new DirectExchange(/confirm/i_EXCHANGE_NAME);
}
//声明队列
@Bean("/confirm/iQueue")
public Queue /confirm/iQueue() {
return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
}
//绑定交换机与队列
@Bean
public Binding queueBinding(@Qualifier("/confirm/iQueue") Queue queue,
@Qualifier("/confirm/iExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("key");
}
}
(3)实现确认回调/confirm/iCallback
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate./confirm/iCallback {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
}
(4)生产者
@RestController
@RequestMapping("//confirm/i")
@Slf4j
public class ProducerController {
public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
//依赖注入rabbitTemplate并设置它的回调对象
@PostConstruct
public void init() {
rabbitTemplate.set/confirm/iCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
rabbitTemplate.convertAndSend(/confirm/i_EXCHANGE_NAME, "key", message, new CorrelationData("1"));
}
}
(5)消费者
@Component
@Slf4j
public class /confirm/iConsumer {
public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";
@RabbitListener(queues = /confirm/i_QUEUE_NAME)
public void receiveMsg(Message message) {
log.info("接受到队列 /confirm/i.queue 消息:{}", new String(message.getBody()));
}
}
测试:
http://localhost:8080//confirm/i/sendMessage/你好鸭
开启了生产者确认机制的情况下,交换机接收到消息后,发现该消息不可路由,那么消息会被直接丢弃
解决办法:使用ReturnsCallback回调
(1)配置文件
添加:spring.rabbitmq.publisher-returns=true
spring.rabbitmq.host=192.168.196.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-/confirm/i-type=correlated
spring.rabbitmq.publisher-returns=true
(2)实现回退回调ReturnsCallback
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机已经收到 id 为:{}的消息", id);
} else {
log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
System.err.println("ReturnedMessage: " + returned);
}
}
(3)依赖注入rabbitTemplate并设置它的回调对象
@PostConstruct
public void init() {
//设置确认回调
rabbitTemplate.set/confirm/iCallback(myCallBack);
rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
rabbitTemplate.setReturnsCallback(myCallBack);
}
备份交换机:如果回退回调与备份交换机同时使用,备份交换机优先级高
当消息路由失败时,可以在该交换机处设置备份交换机,将路由失败的消息路由到备份交换机上
//声明交换机并指明其备份交换机
@Bean("/confirm/iExchange")
public DirectExchange /confirm/iExchange() {
ExchangeBuilder exchangeBuilder =
ExchangeBuilder.directExchange(/confirm/i_EXCHANGE_NAME)
.durable(true)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);//指明备份交换机
return (DirectExchange) exchangeBuilder.build();
}