栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ死信队列

RabbitMQ死信队列

死信队列是什么?

死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。那么死信是个什么东西呢?

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,如果队列里的消息出现以下情况:

  1. 消息被否定确认,使用 channel.basicNack 或 channel.basicReject ,并且此时requeue 属性被设置为false。

  2. 消息在队列的存活时间超过设置的TTL时间。

  3. 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

代码实战 1、准备阶段

这里我准备了一个SpringBoot基础环境代码,我会在此基础上进行集成,代码我已上传,地址如下:

SpringBootbase: SpringBoot基础项目框架

基础薄弱的同学,可以下载下来,跟我下面的步骤一步一步走,就可以出来效果了,再集成到你自己的项目中

代码下载完之后的目录结构是这样的:

 2、添加相关依赖jar包

 在这个代码基础上,我们需要先引入RabbitMQ的依赖,在pom.xml中加入下面依赖:


    org.springframework.boot
    spring-boot-starter-amqp
3、yml文件添加相关配置

我们再application.yml文件中新增下面代码:

rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    # 发送确认
    publisher-/confirm/is: true
    # 路由失败回调
    publisher-returns: true
    template:
      # 必须设置成true 消息路由失败通知监听者,false 将消息丢弃
      mandatory: true
    listener:
      simple:
        # 每次从RabbitMQ获取的消息数量
        prefetch: 1
        default-requeue-rejected: false
        # 每个队列启动的消费者数量
        concurrency: 1
        # 每个队列最大的消费者数量
        max-concurrency: 1
        # 签收模式为手动签收-那么需要在代码中手动ACK
        acknowledge-mode: manual

还需要添加队列交换名称等信息,代码如下:

#死信消息模型
dead:
  exchange-dead: order.dead.exchange
  routing-dead-key: order.dead.routingKey
  queue-dead: order.dead.queue
  normal-queue: order.normal.queue
  normal-exchange: order.normal.exchange
  normal-routing-key: order.normal.routingKey
  #设置过期时间(单位:毫秒)
  expire: 3000

最后结构如下图:

 各位一定要注意缩进哦,不然到时候有问题的

4、RabbitMQ配置类 

在config包下面新建RabbitmqConfig配置类,代码如下:

package org.wujiangbo.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.wujiangbo.entity.RabbitMQProperties;
import java.util.HashMap;
import java.util.Map;


@Configuration
@Slf4j
public class RabbitmqConfig {

    @Autowired
    private RabbitMQProperties rabbitMQProperties;

    //创建普通队列
    @Bean
    public Queue normalQueue() {
        Map arguments = new HashMap<>(2);
        // 设置死信参数
        // 绑定死信交换机
        arguments.put("x-dead-letter-exchange", rabbitMQProperties.getExchangeDead());
        // 绑定死信的路由key
        arguments.put("x-dead-letter-routing-key", rabbitMQProperties.getRoutingDeadKey());
        return new Queue(
                rabbitMQProperties.getNormalQueue(),
                true,
                false,
                false,
                arguments);
    }
    //创建普通交换机
    @Bean
    public Exchange normalExchange(){
        return new TopicExchange(rabbitMQProperties.getNormalExchange(),true,false);
    }
    //绑定普通队列和普通交换机
    @Bean
    public Binding normalBind(){
        return BindingBuilder.bind(normalQueue()).to(normalExchange()).with(rabbitMQProperties.getNormalRoutingKey()).noargs();
    }

    //创建死信交换机
    @Bean
    public Exchange deadExchange(){
        return new TopicExchange(rabbitMQProperties.getExchangeDead(),true,false);
    }

    //创建死信队列
    @Bean
    public Queue deadQueue(){
        return new Queue(rabbitMQProperties.getQueueDead(),true,false,false);
    }

    //绑定死信队列与死信交换机
    @Bean
    public Binding realBindDead(){
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(rabbitMQProperties.getRoutingDeadKey()).noargs();
    }
}

里面需要依赖两个实体类,代码如下:

package org.wujiangbo.entity;

import lombok.Data;
import lombok.ToString;

import java.io.Serializable;


@ToString
@Data
public class OrderDto implements Serializable {

    private Long id;//订单编号
    private Long userId;//订单归属人ID
    private Double orderMoney;//订单金额
    private String orderStatus;//订单状态(0:待支付;1:已支付;2:未支付(超时))
}

还有一个RabbitMQProperties类,代码如下:

package org.wujiangbo.entity;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = "dead")
@Data
public class RabbitMQProperties {

    private String exchangeDead;//死信队列交换机
    private String routingDeadKey;//死信队列路由键
    private String queueDead;//死信队列名称
    private String normalQueue;//正常队列名称
    private String normalExchange;//正常交换机名称
    private String normalRoutingKey;//正常路由键
    private String expire;//过期时间

}

此时项目结果如下:

 

5、生产者代码

生产者ProducerTest类代码如下:

package org.colin.mq;

import lombok.extern.slf4j.Slf4j;
import org.colin.entity.OrderDto;
import org.colin.entity.RabbitMQProperties;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;


@Configuration
@EnableScheduling
@Slf4j
public class ProducerTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitMQProperties rabbitMQProperties;

    //模拟下单操作
    @Scheduled(fixedDelay = 10 * 1000)//每隔X秒运行一次
    public void addOrder() {
        log.info("-------------------------------------开始下单啦-------------------------------------");
        //测试数据
        OrderDto dto = new OrderDto();
        dto.setId(7L);
        dto.setOrderMoney(900D);
        dto.setUserId(14L);
        dto.setOrderStatus("0");//订单状态(0:待支付;1:已支付;2:未支付(超时))
        try {
            //将订单消息发到MQ服务器中
            rabbitTemplate.convertAndSend(
                    rabbitMQProperties.getNormalExchange(),
                    rabbitMQProperties.getNormalRoutingKey(),
                    dto, new MessagePostProcessor() {
                //在消息发送之前应用于消息的处理器
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    MessageProperties messageProperties = message.getMessageProperties();
                    //设置过期时间TTL
                    messageProperties.setExpiration(String.valueOf(rabbitMQProperties.getExpire()));
                    //设置持久化
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                }
            });
        } catch (Exception e) {
            log.error("下单失败:{}", e);
        }
    }
}

该生产者代码我是用一个定时任务做的,定时每10秒钟发一次消息,模拟下单操作

6、消费者代码
package org.wujiangbo.mq;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;
import org.wujiangbo.entity.OrderDto;
import java.io.IOException;
import java.util.Map;


@Component
@Slf4j
public class ConsumerTest {

    
    @RabbitListener(queues = {"${dead.queue-dead}"})
    public void handleDeadMessage(OrderDto dto, Message message, Channel channel, @Headers Map headers) throws IOException {
        log.info("接收到死信队列的消息:{}", dto.toString());
        //回复ack
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
}

注意:

我的消费者类中,没有写方法去监听正常的队列,而是只写了一个方法监听死信队列,这样做的目的,就是为了模拟用户下单一直不支付的场景,那么正常队列中的消息就会一直不会被消费而长期存留在队列中,而我在yml中设置的超时时间是3秒,也就是说,订单消息在队列中3秒内没有消费者消费的话,那就会被路由到死信交换机中,从而进入死信队列中,那么我这里监听着死信队列,就可以拿到消息了,此时拿到的所有消息都是过期未支付的订单,就可以做业务处理了,比如修改数据库订单状态为未支付,然后将库存加回数据库,让其他用户也可以下单购买

7、测试

启动项目,观察控制台:

从控制台可以看出,每次发消息3秒之后,监听着死信队列的消费者就可以拿到消息了, 因为消息在正常队列中存在3秒之后就到了过期时间,就会被自动路由到死信交换机中,从而抵达死信队列,那么就会被监听到消费掉

至此,测试成功了

完整代码地址:SpringBootRabbitMQ001: RabbitMQ死信队列测试

结束语

这样的死信队列在很多实际业务场景都可以得到很好的应用,希望大家学到后,能灵活运用

最后总结了一张图:

 

  • 大家如果还有任何疑问,可以留言,我会第一时间回复的

  • 最后别忘点赞哦,非常感谢大家的支持与厚爱,我会不断分享更多干货给大家的哈

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/674093.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号