栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ消息可靠性 (投递+消费)

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ消息可靠性 (投递+消费)

文章目录
    • Rabbitmq的消息可靠性投递
    • Rabbitmq的消息可靠性投递confirmCallback
    • Rabbitmq的消息可靠性投递returnCallback
    • Rabbitmq的消息确机制ACK

Rabbitmq的消息可靠性投递

什么是消息的可靠性投递

保证消息百分百发送到消息队列中去
1 保证mq节点成功接受消息,消息发送端需要接受到mq服务端接受到消息的确认应答
2 完善的消息补偿机制,发送失败的消息可以再感知并⼆次处理

RabbitMQ消息投递路径

⽣产者–>交换机->队列->消费者

通过两个的点控制消息的可靠性投递

⽣产者到交换机 通过confirmCallback
交换机到队列 通过returnCallback

建议

开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量
下降严重,不是⾮常重要的消息不建议⽤消息确认机制

Rabbitmq的消息可靠性投递confirmCallback

⽣产者到交换机

通过confirmCallback,⽣产者投递消息后,如果Broker收到消息后,会给⽣产者⼀个ACK。⽣产者通过ACK,可以确认这条消息是否正常发送到Broker,这种⽅式是消息可靠性投递的核⼼

开启confirmCallback

//旧版,确认消息发送成功,通过实现ConfirmCallBack接⼝,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
//新版,NONE值是禁⽤发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调⽅法
spring.rabbitmq.publisher-confirm-type:correlated
@SpringBootTest
class ApplicationTests {

	@Autowired
	private RabbitTemplate template;

	
	@Test
	void testConfirmCallback(){

		template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

			
			@Override
			public void confirm(CorrelationData correlationData, boolean ack, String cause) {

				System.out.println("ConfirmCallback======>");
				System.out.println("correlationData======>correlationData="+correlationData);
				System.out.println("ack======>ack="+ack);
				System.out.println("cause======>cause="+cause);

				if(ack){
					System.out.println("发送成功");
					//更新数据库 消息的状态为成功  TODO
				}else {
					System.out.println("发送失败,记录到日志或者数据库");
					//更新数据库 消息的状态为失败  TODO
				}

			}
		});

		//数据库新增一个消息记录,状态是发送  TODO

		//发送消息
		template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单");
	}


Rabbitmq的消息可靠性投递returnCallback

交换机到队列

通过returnCallback,消息从交换器发送到对应队列失败时触发

两种模式

交换机到队列不成功,则丢弃消息(默认)
交换机到队列不成功,返回给消息⽣产者,触发returnCallback

第⼀步 开启returnCallback配置

spring.rabbitmq.publisher-returns=true

第⼆步 修改交换机投递到队列失败的策略(丢弃或者返回给消息⽣产者)

//为true,则交换机处理消息到路由失败,则会返回给⽣产者
spring.rabbitmq.template.mandatory=true
@SpringBootTest
class ApplicationTests {

	@Autowired
	private RabbitTemplate template;

	
	@Test
	void testReturnCallback(){

		template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {

			@Override
			public void returnedMessage(ReturnedMessage returned) {
				int code = returned.getReplyCode();
				System.out.println("code="+code);
				System.out.println("returned="+returned.toString());

			}
		});

		template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new","新订单ReturnsCallback");
	}
}
Rabbitmq的消息确机制ACK

消费者从broker中监听消息,需要确保消息被合理处理

ACK介绍

消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列
中删除,消费者在处理消息出现了⽹络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放⼊队列中,只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。

消息的ACK确认机制默认是打开的,消息如未被进⾏ACK的消息确认机制,这条消息被锁定Unacked

确认⽅式

⾃动确认(默认)
⼿动确认 manual
其他(基本不⽤,忽略)

spring:
 rabbitmq:
  #开启⼿动确认消息,如果消息重新⼊队,进⾏重试
  listener:
   simple:
    acknowledge-mode: manual
@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {

    @RabbitHandler
    public void messageHandler(String body, Message message, Channel channel) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("body="+body);

        //复杂业务逻辑
        
        //告诉broker,消息已经被确认
        //成功确认,使⽤此回执⽅法后,消息会被rabbitmq broker 删除
        channel.basicAck(msgTag,false);

        //告诉broker,消息拒绝确认
        //channel.basicNack(msgTag,false,true);

        //channel.basicReject(msgTag,true);

    }
}

deliveryTag介绍

表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

basicNack和basicReject介绍

basicReject⼀次只能拒绝接收⼀个消息,可以设置是否requeue。
basicNack⽅法可以⽀持⼀次0个或多个消息的拒收,可以设置是否requeue。

⼈⼯审核异常消息

设置重试阈值,超过后确认消费成功,记录消息,⼈⼯处理

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/868501.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号