栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 消息确认

RabbitMQ 消息确认

1.生成者不知道消息是否真正到达broker(/confirm/i模式)

(1)普通/confirm/i模式:同步确认发布,publish一条消息后,等待服务器端/confirm/i,如果服务端返回false或者超时时间内未返回,客户端进行消息重传
channel./confirm/iSelect();//开启发布确认
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
if(!channel.waitFor/confirm/is()){
	System.out.println("send message failed.");
}

(2)批量/confirm/i模式:同步确认发布,每发送一批消息后,调用waitFor/confirm/is()方法,等待服务器端confirm
channel./confirm/iSelect();
for(int i=0;i outstandingConfirms = new ConcurrentSkipListMap<>();
        
        /confirm/iCallback ackCallback = (sequenceNumber, multiple) -> {
            if (multiple) {
                //返回的是小于等于当前序列号的未确认消息 是一个 map
                ConcurrentNavigableMap confirmed = outstanding/confirm/is.headMap(sequenceNumber, true);
                //清除该部分未确认消息
                /confirm/ied.clear();
            } else {
                //只清除当前序列号的消息
                outstanding/confirm/is.remove(sequenceNumber);
            }
        };

        /confirm/iCallback nackCallback = (sequenceNumber, multiple) -> {
            String message = outstanding/confirm/is.get(sequenceNumber);
            System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
        };

        
        channel.add/confirm/iListener(ackCallback, nackCallback);

        long begin = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            String message = "message:" + i;
            
            outstanding/confirm/is.put(channel.getNextPublishSeqNo(), message);
            channel.basicPublish("", queueName, null, message.getBytes());
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + 1000 + "个异步确认消息,耗时" + (end - begin) + "ms");
    }
}

2.保证消息从队列可靠地到达消费者(关闭自动消息确认,进行手动ack),
public class Consumer {
    private final static String QUEUE_NAME = "TEST_QUEUE";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:" + receivedMessage);
			//手动ack
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        System.out.println("Consumer等待消费......");
        //关闭自动消息确认 autoAck = false;
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
    }
}
 发布确认高级:
1.投递到交换机失败,投递失败后应该通知消息生产者(确认回调)
2.交换机投递成功但是消息路由到队列失败(回退回调)

springboot版本:
(1)配置文件
spring.rabbitmq.publisher-/confirm/i-type=correlated
NONE:禁用发布确认模式,是默认值
CORRELATED:发布消息成功到交换器后会触发回调方法
SIMPLE:略

spring.rabbitmq.host=192.168.196.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-/confirm/i-type=correlated

(2)交换机、队列配置类
@Configuration
public class /confirm/iConfig {
    public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";
    public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";

    //声明交换机
    @Bean("/confirm/iExchange")
    public DirectExchange /confirm/iExchange() {
        return new DirectExchange(/confirm/i_EXCHANGE_NAME);
    }

    //声明队列
    @Bean("/confirm/iQueue")
    public Queue /confirm/iQueue() {
        return QueueBuilder.durable(/confirm/i_QUEUE_NAME).build();
    }

    //绑定交换机与队列
    @Bean
    public Binding queueBinding(@Qualifier("/confirm/iQueue") Queue queue,
                                @Qualifier("/confirm/iExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("key");
    }
}

(3)实现确认回调/confirm/iCallback 
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate./confirm/iCallback {
    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }
}

(4)生产者
@RestController
@RequestMapping("//confirm/i")
@Slf4j
public class ProducerController {
    public static final String /confirm/i_EXCHANGE_NAME = "/confirm/i.exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MyCallBack myCallBack;

    //依赖注入rabbitTemplate并设置它的回调对象
    @PostConstruct
    public void init() {
        rabbitTemplate.set/confirm/iCallback(myCallBack);
    }

    @GetMapping("sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend(/confirm/i_EXCHANGE_NAME, "key", message, new CorrelationData("1"));
    }
}

(5)消费者
@Component
@Slf4j
public class /confirm/iConsumer {
    public static final String /confirm/i_QUEUE_NAME = "/confirm/i.queue";

    @RabbitListener(queues = /confirm/i_QUEUE_NAME)
    public void receiveMsg(Message message) {
        log.info("接受到队列 /confirm/i.queue 消息:{}", new String(message.getBody()));
    }
}

测试:
http://localhost:8080//confirm/i/sendMessage/你好鸭

开启了生产者确认机制的情况下,交换机接收到消息后,发现该消息不可路由,那么消息会被直接丢弃
解决办法:使用ReturnsCallback回调
(1)配置文件
添加:spring.rabbitmq.publisher-returns=true
spring.rabbitmq.host=192.168.196.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
spring.rabbitmq.publisher-/confirm/i-type=correlated
spring.rabbitmq.publisher-returns=true
(2)实现回退回调ReturnsCallback
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnsCallback {
    
    @Override
    public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机已经收到 id 为:{}的消息", id);
        } else {
            log.info("交换机还未收到 id 为:{}消息,由于原因:{}", id, cause);
        }
    }

    
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.err.println("ReturnedMessage: " + returned);
    }
}
(3)依赖注入rabbitTemplate并设置它的回调对象
@PostConstruct
public void init() {
    //设置确认回调
    rabbitTemplate.set/confirm/iCallback(myCallBack);
    
    rabbitTemplate.setMandatory(true);
    //设置回退消息交给谁处理
    rabbitTemplate.setReturnsCallback(myCallBack);
}

备份交换机:如果回退回调与备份交换机同时使用,备份交换机优先级高
当消息路由失败时,可以在该交换机处设置备份交换机,将路由失败的消息路由到备份交换机上
//声明交换机并指明其备份交换机
@Bean("/confirm/iExchange")
public DirectExchange /confirm/iExchange() {
    ExchangeBuilder exchangeBuilder =
            ExchangeBuilder.directExchange(/confirm/i_EXCHANGE_NAME)
                    .durable(true)
                    .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME);//指明备份交换机
    return (DirectExchange) exchangeBuilder.build();
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/630582.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号