栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ 学习(三)——RabbitMQ的高级特性

RabbitMQ 学习(三)——RabbitMQ的高级特性

文章目录
  • 前言
  • 一、生产者可靠性
    • 1 生产者消息传递的可靠性
    • 1.1 模式介绍
    • 1.2 确认模式
      • 1.2.1 示例代码
      • 1.2.2 pom.xml
      • 1.2.3 RabbitMQ 配置
      • 1.2.4 spring 整合配置文件
      • 1.2.5 测试类
      • 1.2.6 测试结果
    • 1.3 退回模式
      • 1.3.1 测试类
      • 1.3.2 测试结果
  • 二、消费端消息传递的可靠性
    • 2.1 Consume Ack
    • 2.2 示例代码
      • 2.2.1 pom.xml
      • 2.2.2 rabbitmq.properties
      • 2.2.3 spring-rabbitmq.xml
      • 2.2.4 AckListener.java
      • 2.2.5 测试类
      • 2.2.6 测试结果
    • 2.3 Consumer 限流
  • 三、 TTL
    • 3.1 示例代码
      • 3.1.1 配置文件
      • 3.1.2 测试代码
    • 3.2 小结
  • 四、 死信队列
    • 4.1 介绍
    • 4.2 示例代码
      • 4.2.1 配置文件
      • 4.2.2 测试类
      • 4.2.3 消费者代码
        • 4.2.3.1 DlxListener.java
        • 4.2.3.2 配置文件
        • 4.2.3.3 测试类
      • 4.2.4 测试结果
  • 五、 延迟队列
    • 5.1 示例代码
      • 5.1.1 pom.xml
      • 5.1.2 配置文件
      • 5.1.3 测试类
      • 5.1.4 测试结果
    • 5.2 消费者
      • 5.2.1 监听类
      • 5.2.2 配置文件
      • 5.2.3 测试类
      • 5.2.4 消费结果![请添加图片描述](https://img-blog.csdnimg.cn/f23b8a59a6a74ebca44f5f5a4d10ef5c.png?x-oss-process=image/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBA5qmZ5a2Q5piv6JOd6Imy55qE,size_20,color_FFFFFF,t_70,g_se,x_16)


前言 记录我学习的 RabbitMQ 的高级特性。包括:生产端 和 消费端的消息可靠性、死信队列、延迟队列 和 TTL。

提示:以下是本篇文章正文内容,下面案例可供参考

一、生产者可靠性 1 生产者消息传递的可靠性

在使用 RabbitM 的时候,作为消息的生产者(发送者),我们希望杜绝任何情况下的消息丢失和投递失败。对此, RabbitMQ 提供了两种方法来确保投递过程的正确性。

  1. confirm 确认模式
  2. return 退回模式
1.1 模式介绍

RabbitMQ 的整个消息投递的过程可以划分为下面的步骤:
producer --> rabbitmq broker --> exchange --> queue --> consumer

  • 确认模式:消息从产生(producer)到 exchange(交换机)这一部分,若是出现问题,会返回一个 /confirm/iCallBack;
  • 退回模式:消息从 exchange(交换机)到 queue(队列)这一部分,消息投递失败,会返回一个 returnCallBack。

我们就是 通过 这两个 Callback 确保消息投递的可靠性。

1.2 确认模式
  1. 确认模式开启:ConnectionFactory 中开启 publisher-confirms=“true”
  2. 在 rabbitTemplate 定义 ConfirmCallBack 回调函数.
  3. 发送消息,若是在消息投递到 exchange 之前出错,我们的回调函数的逻辑就会执行,进行进一步处理。
// 定义回调函数
rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
    		
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("/confirm/i方法被执行了....");
                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }

            }
        });
1.2.1 示例代码

通过 spring 整合 RabbitMQ 做示范。

1.2.2 pom.xml


    4.0.0

    org.example
    rabbitmq-return-firm
    1.0-SNAPSHOT

    
    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        

    

	
1.2.3 RabbitMQ 配置
rabbitmq.host=192.168.191.130
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/xzk
1.2.4 spring 整合配置文件



    
    

    
    

    
    

    
    
        
            
        
    

    
    

1.2.5 测试类
package com.kaikeba./confirm/i;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class Rabbit/confirm/iTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void test/confirm/i() throws InterruptedException {
        rabbitTemplate.set/confirm/iCallback(new RabbitTemplate./confirm/iCallback() {
            public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("/confirm/i方法被执行了....");
                if (ack) {
                    //接收成功
                    System.out.println("接收成功消息" + cause);
                } else {
                    //接收失败
                    System.out.println("接收失败消息" + cause);
                    //做一些处理,让消息再次发送。
                }

            }
        });

        rabbitTemplate.convertAndSend("test_/confirm/i_exchange", "item./confirm/i", "whovwhuhuwf ");
        //
        Thread.sleep(2000);
    }
}

1.2.6 测试结果

● 正常传递

● 我们把交换机改错,然后再传递

1.3 退回模式

当消息发送给Exchange后,Exchange路由到Queue失败时才会执行 ReturnCallBack

  1. 开启回退模式:publisher-returns=“true”;
  2. 设置ReturnCallBack,复写对应方法;
  3. 设置Exchange处理消息失败的模式:setMandatory。
  @Test
    public void testReturn() {
        // 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);

        // 设置 ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
                System.out.println(exchange);
                System.out.println(routingKey);
            }
        });

        //3. 发送消息
        rabbitTemplate.convertAndSend("test_/confirm/i_exchange", "item./confirm/i",
                "message returnback....");
    }
1.3.1 测试类

其他的东西和确认模式的一样

package com.kaikeba./confirm/i;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class Rabbit/confirm/iTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testReturn() {
        // 设置交换机处理失败消息的模式
        rabbitTemplate.setMandatory(true);

        // 设置 ReturnCallBack
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return 执行了....");
                System.out.println("message =" + message);
                System.out.println("replyCode = " + replyCode);
                System.out.println("replyText = " + replyText);
                System.out.println("exchange = " + exchange);
                System.out.println("routingKey = " + routingKey);
            }
        });

        //3. 发送消息
        rabbitTemplate.convertAndSend("test_/confirm/i_exchange", "item.confi--rm",
                "message returnback....");
    }
}

1.3.2 测试结果

二、消费端消息传递的可靠性 2.1 Consume Ack

ack 指的是 Acknowledge 确认;表示消费端收到消息后的确认方式,这种方式有三种:

  • 自动确认:acknowledge=“none”;
  • 手动确认:acknowledge=“manual”;
  • 根据异常情况确认:acknowledge=“auto”;这种方式比较麻烦,不常用。

其中,自动确认是指当消息一旦被消费者 Consumer 接收到,就自动确认收到,并将相应的消息 message 从 RabbitMQ 的消息缓存中移除;但是在实际业务中,消息接收到之后,很有可能业务处理出现异常,那么代码中的消息处理失败,队列中的消息已经被移除,就会造成消息丢失。此时,我们就需要设置手动确认的逻辑;在业务处理成功之后,调用 channel.basicAck() 手动签收,如果出现异常。则调用 channel.basicNack() ,让消息自动重发。

Consumer Ack 机制:

  1. 设置手动签收: acknowledge=“manual” ;
  2. 让监听器类实现 ChannelAwareMessageListener 接口;
  3. 如果消息处理成功, 则调用channel的 basicAck()签收 ;
  4. 如果消息处理失败,则调用channel的basicNack()拒绝签收,broker重新发送给consumer 。
2.2 示例代码 2.2.1 pom.xml


    4.0.0

    org.example
    rabbit-ack-consumer
    1.0-SNAPSHOT

    
    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        

    


2.2.2 rabbitmq.properties
rabbitmq.host=192.168.191.131
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/xzk
2.2.3 spring-rabbitmq.xml



    
    

    
    

    
    

    

    
    
        
    

2.2.4 AckListener.java
package com.kaikebai.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class AckListener implements ChannelAwareMessageListener {
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println(new String(message.getBody(), "UTF-8"));
            System.out.println("逻辑处理业务");

            // 模拟出异常
            int i = 3/0;
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            e.printStackTrace();
            // 拒绝签收
            
            channel.basicNack(deliveryTag, true, true);
            // 下面的也是拒绝签收消息,但是不推荐使用,每次只能拒绝一个,
            // channel.basicReject(deliveryTag, true);
        }
    }

    public void onMessage(Message message) {

    }
}

2.2.5 测试类

启动项目。试监听器开始工作

package com.kaikebai.listener;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class AckListererTest {

    @Test
    public void test() {
        while (true) {

        }
    }

}

2.2.6 测试结果

● 异常结果
● 正常情况

2.3 Consumer 限流


MQ 可以进行“削峰填谷”,当在短时间出现大量数据时(如秒杀活动);我们可以将消息存放到消息队列中,然后系统从消息队列中,每次确定特定数量的消息进行消费,以此达到限流的目的。

Consumer 限流机制:

  1. 确保 Ack 机制是手动确认;
  2. listener-container 设置属性,
    a. perfetch=1,表示消费端每次从 MQ 队列中取出一条消息进行消费,消费完毕之后,再去拉取下一条先息进行消费。
package com.kaikebai.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class QspListener implements ChannelAwareMessageListener {
    public void onMessage(Message message, Channel channel) throws Exception {
        Thread.sleep(1000);
        //1.获取消息
        System.out.println(new String(message.getBody()));
        //2. 处理业务逻辑
        //3. 签收
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
    }

    public void onMessage(Message message) {

    }
}

三、 TTL

全称是 Time To Live ,消息过期时间设置。队列或消息设置过期时间的话,在规定时间过后,队列中的消息将被重新处理;处理方式有两种:

  1. 进入死信队列:若是该队列有对应的死信队列,则消息过期之后,会根据配置的死信队列信息,转发到对应的四星队列中;
  2. 直接丢弃:若是没有配置死信队列,则消息过期之后,江北直接丢弃。

管控台中设置队列TTL :

3.1 示例代码 3.1.1 配置文件

rabbitmq.properties

rabbitmq.host=192.168.191.132
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/xzk

spring-rabbitmq.xml



    
    

    
    

    
    

    
    
        
            
        
    

    
        
            
        
    

    
    

3.1.2 测试代码
package com.kaikeba;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;


@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring/spring-rabbitmq.xml")
public class TtlQueueTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    

    
    @Test
    public void testQueueTtl() {
        rabbitTemplate.convertAndSend("test_ttl_exchange", "ttl.hello", "测试TTL队列");
    }

    
    @Test
    public void testMessageTtl() {
        // 消息处理对象,设置一些消息的参数信息
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            public Message postProcessMessage(Message message) throws AmqpException {
                return null;
            }

            public Message postProcessMessage(Message message, Correlation correlation) {
                // 1. 设置 message 的过期时间
                message.getMessageProperties().setExpiration("5000");

                return message;
            }
        };

        // 消息单独发送
        rabbitTemplate.convertAndSend("test_ttl_queue", "ttl.hi", "消息单独过期时间测试", messagePostProcessor);
        
    }
}

3.2 小结
  1. 消息过期可以从队列和消息本身两个维度进行设置;队列层次设置时,整个队列中的消息都以这个过期时间设置为准;在消息层次设置时,只对当前单个消息有效;同时设置时,那一个设置的时间短,那一个起作用。
  2. 针对单个消息设置过期时间时,只有当这个消息是队列中的第一个消息时(排在队列的首位),这个设置才会生效。否则设置不会生效。
  3. 单独设置消息过期时间是通过 messagePostProcessor 中的 postProcessMessage 方法 来设置的
四、 死信队列 4.1 介绍

死信队列:DLX(Dead Letter Exchange 死信交换机),因为有的 MQ产品是没有交换机这个设计的,所以我们一般直接叫死信队列。当消息成为 Dead Message 之后,可以被重新发送到另一个交换机,这个交换机就是 DLX 。

消息成为死信的三种情况:

  1. 队列消息长度到达限制;
  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队 列,requeue=false;
  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

4.2 示例代码 4.2.1 配置文件

rabbitmq.properties

rabbitmq.host=192.168.191.130
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/xzk

spring-rabbitmq.xml




    
    

    
    
    

    

    
    
    
        
            
        
    

    
    
        
        
            
            
            
            
            
            
            
            
        
    
    
    
        
            
        
    

    
    

4.2.2 测试类
package com.kaikeba.dlx;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class DeadLetterExchangeTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @Test
    public void testDlxexpression() {
        // 死信 -- 测试过期时间
        rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hehe", "死信队列,我是测试过期时间的!!!");
    }

    
    @Test
    public void testDlxMax() {
        // 死信 -- 数量超过
        for(int i = 0; i < 20; i++) {
            rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hehe", "死信队列,我是测试过期时间的!!!");
        }
    }

    
    @Test
    public void testDlxRef() {
        // 死信 -- 拒收
        rabbitTemplate.convertAndSend("test_exchange_dlx", "test.dlx.hehe", "死信队列,我是测试过期时间的!!!");
    }
}

4.2.3 消费者代码

消费者代码是模拟消费端拒接消息的场景

4.2.3.1 DlxListener.java
package com.kaikeba.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class DlxListener implements ChannelAwareMessageListener {
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            int i = 3/0;//出现错误
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }
    }

    public void onMessage(Message message) {

    }
}

4.2.3.2 配置文件

rabbitmq.properties

rabbitmq.host=192.168.191.130
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/xzk

spring-rabbitmq.xml




    
    

    
    
    

    

    
    
    
        
            
        
    

    


    
        
    

    
    

4.2.3.3 测试类
package com.kaikeba.listener;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class DlsConsumerTest {

    @Test
    public void test() {
        while (true) {

        }
    }
}

4.2.4 测试结果
  1. 交换机
  2. 交换机绑定关系
  • 交换机


  • 队列

test_queue_dlx 是正常交换机,我们设置的是10秒,10 秒之后,就转发到死信队列 queue_dlx。

五、 延迟队列

延迟队列:即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

我们模拟一个下单需求,来说明 RabbitMQ 的延时队列的实现

模拟需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。
  2. 新用户注册成功7天后,发送短信问候。

一般情况:
有的 MQ 产品是有延迟队列的,可以直接实现;

实现方式:

  1. 定时器
  2. 延迟队列


而我们的 RabbitMQ 中没有延迟队列的直接实现;但是我们可以通过 TTL + 死信队列 来达到延迟队列的效果。

我们把正常队列的消息过期时间设置为 30min;将消费投递到正常队列,30min 过后消息过期,转发到对应的死信队列,我们的程序监听死信队列,并消费其中的消息。就达到了消息延迟 30min 的目的。

实现过程:

  1. 定义正常交换机(order_exchange)和队列(order_queue)
  2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
  3. 绑定,设置正常队列过期时间为30分钟
5.1 示例代码 5.1.1 pom.xml


    4.0.0

    org.example
    rabbit-delay-producer
    1.0-SNAPSHOT

    
    
        
            org.springframework
            spring-context
            5.1.7.RELEASE
        
        
            org.springframework.amqp
            spring-rabbit
            2.1.8.RELEASE
        
        
            junit
            junit
            4.12
        
        
            org.springframework
            spring-test
            5.1.7.RELEASE
        

    


5.1.2 配置文件

rabbitmq.xml

rabbitmq.host=192.168.191.128
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.virtual-host=/xzk

spring-rabbitmq.xml




    
    

    
    
    

    

    
    

    
    
    
    
    
        
            
        
    

    
    
        
        
            
            
            
            
            
            
        
    
    
        
            
        
    

5.1.3 测试类
package com.kaikeba.delay.kaikeba.delay;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.Date;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class DelayProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testDelay() {
        // 1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange", "order.msg","订单信息:" + new Date());
    }

}

5.1.4 测试结果
  • 交换机

  • 队列
5.2 消费者

消费者监听死信队列,模拟延时效果

5.2.1 监听类
package com.kaikeba.delay;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

@Component
public class DelayListener  implements ChannelAwareMessageListener {
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            //1.接收转换消息
            System.out.println(new String(message.getBody()));
            //2. 处理业务逻辑
            System.out.println("处理业务逻辑...");
            System.out.println("根据订单id查询其状态...");
            System.out.println("判断状态是否为支付成功");
            System.out.println("取消订单,回滚库存....");
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {
            //e.printStackTrace();
            System.out.println("出现异常,拒绝接受");
            //4.拒绝签收,不重回队列 requeue=false
            channel.basicNack(deliveryTag,true,false);
        }

    }

    public void onMessage(Message message) {

    }
}

5.2.2 配置文件

rabbitmq 的配置文件和生产者一样。
spring-rabbitmq.xml




    
    

    
    
    

    

    
    

    
    
        
    


5.2.3 测试类
package com.kaikeba.delay;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq.xml")
public class DelayListenerTest {

    @Test
    public void test() {
        while (true) {

        }
    }
}

5.2.4 消费结果
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/389421.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号