栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

RabbitMQ—SpringBoot中实现死信队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ—SpringBoot中实现死信队列

文章目录
    • 一、什么是死信、死信队列?
      • 1.死信
      • 2.死信队列
    • 二、如何配置死信队列?
      • 1.yml配置文件
      • 2.声明业务队列和死信队列
      • 3.生产者
      • 4.消费者
      • 5.RabbitMQ控制台检验
    • 三、测试死信及如何处理死信
      • 1.自动ack超过重试次数进入死信队列
      • 2.手动签收失败不重回队列进入死信队列
      • 3.消息在队列存活的时间大于TTL进入死信队列
      • 4.消息队列中消息的数量超过最大长度进入死信队列
    • 配置踩的坑

一、什么是死信、死信队列? 1.死信

一条消息经历过下面4种情况任意一种时,会变成死信

  • 1.消费者端配置自动ack acknowledge-mode: auto在重试几次后(配置文件配置默认是3)后进入到死信队列
  • 2.消费者端使用basic.reject和basic.nack拒绝签收消息,并且配置requeue参数是false(即不重回原来的队列)
  • 3.消息在队列存活的时间大于TTL
  • 4.消息队列中消息的数量超过最大长度
2.死信队列

一条消息变为死信时会被重新发布的交换机叫死信交换机,与死信交换机绑定的队列叫做死信队列。跟普通的队列没有多大的区别,多了几个参数而已。如果没有为队列配置死信交换机,则原有的消息被抛弃。用下面的图来展示一下

二、如何配置死信队列?

项目版本:springboot:2.3.6.RELEASE。
引入的依赖:spring-boot-starter-amqp和spring-rabbit

1.yml配置文件
spring:
  rabbitmq:
    host: 192.168.184.128
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: auto # 设置自动确定(ack) manual:为手动确定(即需要调用channel.basicAck才会从队列中删除消息)
        prefetch: 1              #表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条
        default-requeue-rejected: false #消费被拒绝时 true:为重回队列 false为否
        retry:
          enabled: true         #是否支持重试,默认false
          max-attempts: 3       #重试最大次数,默认3次
          max-interval: 1000ms #重试最大间隔时间

2.声明业务队列和死信队列

通过下面的配置将业务队列和死信队列绑定起来,当一条消息从业务队列中变成死信后会重发到死信交换机上,重新消费。

   //声明业务queue时绑定死信交换机、死信队列及相关参数配置
   @Bean
   public Queue queue(){
       Map args = new HashMap<>(2);
       args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
       args.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY);
       //队列的长度为3
       args.put("x-max-length",3);
       //队列中的消息超过3s后过期
       args.put("x-message-ttl",new Long(3000));
       Queue queue = new Queue(QUEUE, true,false,false,args);
       return queue;
   }

上面的配置中两个条件会让消息变为死信

  • args.put(“x-max-length”,3):队列中消息总数大于3
  • args.put(“x-message-ttl”,new Long(3000)):消息超过3s还未被消费
3.生产者

正常使用RabbitTemplate提供的api发送消息即可

rabbitTemplate.convertAndSend(RabbitMQStudentCourseTopicConfig.EXCHANGE,            RabbitMQStudentCourseTopicConfig.ROUTING_KEY_NAME,new MqMessage(studentId,courseId));
4.消费者

消费者分为两类,业务消费者和死信消费者

业务消费者

会报除数不能0的异常

int count=0;

@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
    MqMessage mqMessage = MessageHelper.msgToObj(message, MqMessage.class)
    log.info("重试次数:{}",++count);
    int i = 5/0;
    log.info("消费成功");
}

死信队列监听者

@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.DEAD_LETTER_QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {
    System.out.println("收到死信消息:" + new String(message.getBody()));
    channel.basicAck(deliveryTag,false);
}
5.RabbitMQ控制台检验


已经创建好了两个队列

三、测试死信及如何处理死信 1.自动ack超过重试次数进入死信队列


从上图的重试时间及次数来看也符合配置max-interval: 1000ms、max-attempts: 3

2.手动签收失败不重回队列进入死信队列

yml文件配置:acknowledge-mode: manual。

消费者代码修改如下

  int count=0;
  @RabbitListener(queues = RabbitMQStudentCourseTopicConfig.QUEUE)
  public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
      MqMessage mqMessage = MessageHelper.msgToObj(message, MqMessage.class);
      log.info("重试次数:{}",++count);
      try{
          int i = 5/0;
      }catch (Exception e){
      	  //第三个false:失败不重回队列
          channel.basicNack(deliveryTag,false,false);
          log.info("消费失败");
          throw e;
      }
      log.info("消费成功");
  }

注意:当yml文件配置消费方签收方式为手动时,重试次数有时候起作用,有时候不起作用。但是起作用时不是失败多少次才会进死信队列,而是只要执行channel.basicNack(deliveryTag,false,false);这行代码就会进死信队列


那么如何实现手动签收时,失败3次才进入死信队列呢?我们放到下一篇文章中说。

3.消息在队列存活的时间大于TTL进入死信队列

这个测试只要将业务消费者注释掉即可,设置的TTL是3s,测试结果符合预期

4.消息队列中消息的数量超过最大长度进入死信队列

还是把消费者端代码注释,然后运行,看结果

生产者投递消息时间:2021-10-05 18:30:51,id:2904,
生产者投递消息时间:2021-10-05 18:31:08,id:430,
生产者投递消息时间:2021-10-05 18:31:15,id:4646,
生产者投递消息时间:2021-10-05 18:31:29,id:395,
收到死信消息:{"id":2904}

因为队列长度设置的是3,当投放第4个消息即id为395时,最早进入队列id2904被弹出进入死信队列,可见rabbitmq是一个先进先出的队列。


配置踩的坑

问题1

#method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'STUDENT_COURSE_QUEUE' in vhost '/': received none but current is the value '60000' of type 'signedint', class-id=50, method-id=10)

原因1:在为队列配置死信交换机前已经声明了该队列,rabbitmq不允许重复,所以把之前的队列删掉就可以了
原因2:如果在消费端使用了@RabbitListener的queuesToDeclare属性一定要配置好,这个属性就是显示创建queue。否则也会发生上面的错误

解决办法:通过管理控制台将原来的queue删掉,使用queues属性指定要创建的queue就可以。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/297248.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号