栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq之延迟队列

RabbitMq之延迟队列

延迟队列 1. 延迟队列概念

延迟队列,队列内部是有序的,最重要的特性就体现在它的延迟属性上,延迟队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。

2. 延迟队列使用场景

1.用户下单以后超出支付时间则自动取消
2.淘宝七天自动确认收货
3.用户退款,7天类商家没有处理,自动退款

3. 延迟队列实现方式

1.为队列设置延迟时间 通过设置属性 " x-message-ttl " ,但是使用设置队列属性的方式的话,那么每增加一个新的时间需求就会增加一个新的队列,这样就会导致队列堆积过多。

 arguments.put("x-message-ttl", 10000);

2.为消息设置延迟时间 通过在消息属性上面设置TTL,但是可能会导致消息不会按时死亡,RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列。尤其是第一个消息时间的延迟很长 30s,第二个消息的延迟很短 10s,第二个消息也不会优先执行,并且10s过去以后,第二条消息不会立即过期,而是会等第一条消息被消费后,消费第二条消息,才会判断过期,这时候已经过去40s了

rabbitTemplate.convertAndSend("X","XC",message,msg -> {
            //发送消息  延迟时长  为消息设置延迟时长
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });

3.安装延时队列插件

在官网上下载 https://www.rabbitmq.com/community-plugins.html,下载
rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。
进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ

执行命令: cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
再执行命令:  rabbitmq-plugins enable rabbitmq_delayed_message_exchange

延迟队列配置
引入依赖


        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
        
            com.alibaba
            fastjson
            1.2.47
        
        
            org.projectlombok
            lombok
        
        
        
            io.springfox
            springfox-swagger2
            2.9.2
        

        
            io.springfox
            springfox-swagger-ui
            2.9.2
        
        
        
            org.springframework.amqp
            spring-rabbit-test
            test
        
    

配置文件

server:
  port: 8080
spring:
  rabbitmq:
    host: 192.xxx.xxx.xx
    password: 123456
    port: 5672
    username: admin



package com.atguigu.rabbitmq.springbootrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;



@Configuration
public class DelayedQueueConfig {

    //队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //路由
    public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

    //
    @Bean("delayedQueue")
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //声明交换机
    @Bean("delayedExchange")
    public CustomExchange delayedExchange(){
        Map arg = new HashMap<>();
        arg.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arg);
    }

    @Bean
    public Binding delayedExchangeBindingdelayedQueue(@Qualifier("delayedQueue")Queue delayedQueue , @Qualifier("delayedExchange")CustomExchange delayedExchange){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者

package com.atguigu.rabbitmq.springbootrabbitmq.controller;

import com.atguigu.rabbitmq.springbootrabbitmq.config.DelayedQueueConfig;
import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    private RabbitTemplate rabbitTemplate ;

    
    @GetMapping("sendDelayMsg/{message}/{delayTime}")
    public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
        log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(),delayTime, message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
            //发送消息  延迟时长
            msg.getMessageProperties().setDelay(delayTime);
            return msg;
        });

    }


}

消费者

package com.atguigu.rabbitmq.springbootrabbitmq.entity;

import com.atguigu.rabbitmq.springbootrabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;


@Slf4j
@Component
public class DelayedConsumoer {

    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayed(Message message){
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
    }
}

4. 总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/307704.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号