栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ实现延迟队列

RabbitMQ实现延迟队列

实现方式一:死信队列

AMQP协议和RabbitMQ队列本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能。
但是我们可以通过RabbitMQ的两个特性来曲线实现延迟队列:

1、Time To Live(TTL)

RabbitMQ可以针对Queue设置x-expires 或者 针对Message设置 x-message-ttl,来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)
RabbitMQ针对队列中的消息过期时间有两种方法可以设置。
A: 通过队列属性设置,队列中所有消息都有相同的过期时间。
B: 对消息进行单独设置,每条消息TTL可以不同。

2、Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。

x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

队列出现dead letter的情况

1、消息或者队列的TTL过期
2、队列达到最大长度
3、消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false
代码实现
首先加入依赖


            org.springframework.amqp
            spring-rabbit-test
            test
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            cn.hutool
            hutool-all
            5.7.16
        
        
            com.alibaba
            fastjson
            1.2.78
        

编写配置文件
application.yml

spring:
  application:
    name: delay-queue
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: admin123456
    virtual-host: delays1
server:
  port: 8082

编写队列配置文件

#订单业务队列的名称,交换机名称、路由key、超时时间
delay.bussiness.queue: order_time_out_15s
delay.bussiness.excahnge: order_time_out_15s_exchange
delay.bussiness.route: order_time_out_15s_776

#死信队列的名称,交换机名称、路由key、超时时间
delay.dead.queue: dead_order_time_out_15s
delay.dead.excahnge: dead_order_time_out_15s_exchange
delay.dead.route: dead_order_time_out_15s_666
delay.bussiness.order.timeout: 15000

#利用rabbitmq_delayed_message_exchange实现延迟队列的方式
#插件实现订单业务队列的名称,交换机名称、路由key、超时时间
delay.plugins.queue: plugin_order_delay_30
delay.plugins.exchange: plugin_order_exchange_30
delay.plugins.route.key: plugin_order_route_key_30
delay.plugin.timeout: 30000

编写配置读取类

@Configuration
@PropertySource("classpath:rabbitmqs.properties")
@Data
public class OrderDelayConfig {

    @Value("${delay.bussiness.queue}")
    private String orderDelayQueueName;

    @Value("${delay.bussiness.excahnge}")
    private String orderDelayExchangeName;

    @Value("${delay.bussiness.route}")
    private String orderDelayRouteKey;


    @Value("${delay.dead.queue}")
    private String orderDeadDelayQueueName;

    @Value("${delay.dead.excahnge}")
    private String orderDeadDelayExchangeName;

    @Value("${delay.dead.route}")
    private String orderDeadDelayRouteKey;

    @Value("${delay.bussiness.order.timeout}")
    private Long timeout;

    @Value("${delay.plugins.queue}")
    private String pluginOrderQueueName;

    @Value("${delay.plugins.exchange}")
    private String pluginOrderExchangeName;

    @Value("${delay.plugins.route.key}")
    private String pluginOrderRouteKey;

    @Value("${delay.plugin.timeout}")
    private Long pluginTimeout;

}

编写队列、交换机创建、交换机和队列、路由key值绑定的配置类

  //rabbitMq内置死信队列信息
    private final String dlexchange = "x-dead-letter-exchange";
    private final String dlRouteKey = "x-dead-letter-routing-key";
    private final String ttl = "x-message-ttl";
    @Autowired
    private  OrderDelayConfig orderDelayConfig;

    //创建死信交换机
    @Bean("orderDeadExchange")
    public DirectExchange deadTopicExchange() {
        return new DirectExchange(orderDelayConfig.getOrderDeadDelayExchangeName());
    }
    //创建业务交换机
    @Bean("orderExchange")
    public DirectExchange payTopicExchange() {
        return new DirectExchange(orderDelayConfig.getOrderDelayExchangeName());
    }
    
    //创建死信队列
    @Bean("orderDeadQueue")
    public Queue deadQueue() {
        return new Queue(orderDelayConfig.getOrderDeadDelayQueueName());
    }

    
    @Bean("orderQueue")
    public Queue payQueue() {
        Map params = new HashMap<>();
        //设置队列的过期时间
        params.put(ttl,  orderDelayConfig.getTimeout());
        //声明当前队列绑定的死信交换机
        params.put(dlexchange, orderDelayConfig.getOrderDeadDelayExchangeName());
        //声明当前队列的死信路由键
        params.put(dlRouteKey, orderDelayConfig.getOrderDeadDelayRouteKey());

        return QueueBuilder.durable(orderDelayConfig.getOrderDelayQueueName()).withArguments(params).build();

    }
    
    //订单数据队列绑定交换机和route的key值
    @Bean
    public Binding delayBindingA(@Qualifier("orderQueue") Queue queue,
                                 @Qualifier("orderExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(orderDelayConfig.getOrderDelayRouteKey());
    }


    //死信队列与死信交换机进行绑定
    @Bean
    public Binding BindingErrorQueueAndExchange(@Qualifier("orderDeadQueue") Queue deadQueue,
                                                @Qualifier("orderDeadExchange") DirectExchange exchange) {
        return BindingBuilder.bind(deadQueue).to(exchange).with(orderDelayConfig.getOrderDeadDelayRouteKey());

    }

以上已经完成了配置工作,下面需要完成业务代码实现
1、新建订单实体类

@Data
public class Order {

    
    private String orderNo;
    
    private BigDecimal price;
    
    private int prodductNum;
    
    private BigDecimal totalAmount;

    
    private Date createTime;
}

定义消息发送和消息消费

@Component
@EnableScheduling
@Slf4j
public class OrderDelayQueue {

    private RabbitTemplate rabbitTemplate;
    private OrderDelayConfig orderDelayConfig;
    private final static String orderQueueName = "dead_order_time_out_15s";


    public OrderDelayQueue(RabbitTemplate rabbitTemplate, OrderDelayConfig orderDelayConfig) {
        this.rabbitTemplate = rabbitTemplate;
        this.orderDelayConfig = orderDelayConfig;
    }


    @Scheduled(cron = "0/30 * * * * ?")
    public void sendOrderMsg() {
        Order order;
        for (int i = 0; i < 3; i++) {
            // Thread.sleep(1000);
            order = new Order();
            order.setOrderNo(new Snowflake().nextIdStr());
            order.setCreateTime(new Date());
            rabbitSendMsg(JSON.toJSONString(order));
        }


    }

    
    public void rabbitSendMsg(String msg) {
        rabbitTemplate.convertAndSend(orderDelayConfig.getOrderDelayExchangeName(),
                orderDelayConfig.getOrderDelayRouteKey(), msg);
    }

    //消费死信队列的消息
    @RabbitListener(queues = orderQueueName)
    public void infoConsumption(String data) throws Exception {
        //此处编写执行订单超时状态的逻辑代码
        final String nowformat = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
        final Order order = JSONObject.parseObject(data, Order.class);
        final long diff = (System.currentTimeMillis() - order.getCreateTime().getTime()) / 1000;
        log.info(order.getOrderNo() + "死信队列========:订单已经超时了" + "失效时间" + diff + "秒");
    }

}

到此完成全部代码,启动项目执行即可完成死信队列实现延迟队列功能。

方式2:利用rabbitmq_delayed_message_exchange实现延迟队列 安装插件

1、下载延时消息插件:https://www.rabbitmq.com/community-plugins.html

2、将下载的文件放在rabbitmq的安装路径plugins文件中


安装插件
打开rabbitmq的命令界面
执行命令 rabbitmq-plugins enable rabbitmq_delayed_message_exchange

入上图所示表示安装成功

代码实现

配置信息,在第一种方式中已配置

1、队列、交换机、路由key值创建,绑定
 //插件实现创建业务交换机
    @Bean("pluginOrderExchange")
    public CustomExchange pluginExchange() {
        Map args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(orderDelayConfig.getPluginOrderExchangeName(),
                "x-delayed-message", true, false, args);
    }


    
    @Bean("pluginOrderQueue")
    public Queue pluginQueue() {
        return new Queue(orderDelayConfig.getPluginOrderQueueName());

    }

    
    @Bean
    public Binding delayBindingPlugin(@Qualifier("pluginOrderQueue") Queue queue,
                                      @Qualifier("pluginOrderExchange") CustomExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(orderDelayConfig.getPluginOrderRouteKey()).noargs();
    }
2、消息发送,消息消费
@Component
@EnableScheduling
@Slf4j
public class OrderPluginDelayQueue {

    private RabbitTemplate rabbitTemplate;
    private OrderDelayConfig orderDelayConfig;
    private final static String orderQueueName = "plugin_order_delay_30";
    public OrderPluginDelayQueue(RabbitTemplate rabbitTemplate, OrderDelayConfig orderDelayConfig) {
        this.rabbitTemplate = rabbitTemplate;
        this.orderDelayConfig = orderDelayConfig;
    }


    @Scheduled(cron = "0/30 * * * * ?")
    public void sendOrderMsg() {
        Order order;
        for (int i = 0; i < 3; i++) {
            // Thread.sleep(1000);
            order = new Order();
            order.setOrderNo(new Snowflake().nextIdStr());
            order.setCreateTime(new Date());
            rabbitSendMsg(JSON.toJSONString(order),orderDelayConfig.getPluginTimeout().intValue());
        }


    }

    
    public void rabbitSendMsg(String msg,int delayTime) {
        rabbitTemplate.convertAndSend(orderDelayConfig.getPluginOrderExchangeName(),
                orderDelayConfig.getPluginOrderRouteKey(), msg,s->{
                s.getMessageProperties().setDelay(delayTime);
                return s;
                });
    }

    //消费死信队列的消息
    @RabbitListener(queues = orderQueueName)
    public void infoConsumption(String data) throws Exception {
        //此处编写执行订单超时状态的逻辑代码
        final String nowformat = DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss");
        final Order order = JSONObject.parseObject(data, Order.class);
        final long diff = (System.currentTimeMillis() - order.getCreateTime().getTime()) / 1000;
        log.info(order.getOrderNo() + "插件实现=============订单已经超时了" + "失效时间" + diff + "秒");
    }
}

以上完成第二种方式代码
实现结果截图

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/758316.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号