栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ——延时队列

RabbitMQ——延时队列

延时队列 概念

延时队列其实是死信队列的中消息超时的一种演变,当普通队列没有消费者时,设置了过期时间的消息都会转发到死信队列,交由死信队列的消费者操作,而当满足以上情形的死信队列,我们称之为延时队列。

具体的应用场景

订单在十分钟之内未支付则自动取消新创建的店铺,如果十天内没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天内没有登录则进行短信提醒

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务

本文贯彻的例子

如下图所示,生产者发送一条消息通过交换机分别给QA、QB队列,设置过期时间,当过期时间到达时,转发消息给死信交换机,再由死信队列发送给消费者。

完成基本功能

使用springboot框架快速搭建一个web项目

引入依赖


    org.springframework.boot
    spring-boot-starter



    org.springframework.boot
    spring-boot-starter-amqp


    org.springframework.boot
    spring-boot-starter-web


    org.springframework.boot
    spring-boot-starter-test
    test


    com.alibaba
    fastjson
    1.2.47


    org.projectlombok
    lombok



    io.springfox
    springfox-swagger2
    2.9.2


    io.springfox
    springfox-swagger-ui
    2.9.2



    org.springframework.amqp
    spring-rabbit-test
    test

配置环境

spring:
  rabbitmq:
    host: 121.40.45.37
    port: 5672
    username: admin
    password: Hkx123

添加swagger配置类

@Configuration
@EnableSwagger2
public class SwaggerConfig {
    @Bean
    public Docket webApiConfig(){
        return new Docket(documentationType.SWAGGER_2)
                .groupName("webApi")
                .apiInfo(webApiInfo())
                .select()
                .build();
    }
    private ApiInfo webApiInfo(){
        return new ApiInfoBuilder()
                .title("rabbitmq 接口文档")
                .description("本文档描述了 rabbitmq 微服务接口定义")
                .version("1.0")
                .contact(new Contact("yellowstar", "http://yellowstar.top","614028802@qq.com"))
                .build();
    }
}

添加交换机与队列的配置类

在之前的学习过程中,我们都将交换机与队列的配置过程放在消费者代码中,而在springboot中,我们需要新建一个配置类,将交换机与队列的配置放在其中。

@Configuration
public class TtlQueueConfig {
    //设置普通交换机
    public static String X_EXCHANGE = "X";
    //设置死信交换机
    public static String Y_DEAD_LETTER_EXCHANGE = "Y";
    //设置两个普通队列
    public static String QUEUE_A = "QA";
    public static String QUEUE_B = "QB";
    //设置死信队列
    public static String DEAD_LETTER_QUEUE = "QD";

    //声明普通交换机
    @Bean("Xexchange")
    public DirectExchange Xexchange(){
        return new DirectExchange(X_EXCHANGE);
    }
    //声明死信交换机
    @Bean("Yexchange")
    public DirectExchange Yexchange(){
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    //声明普通队列A,过期时间10s
    @Bean("queueA")
    public Queue queueA(){
        HashMap argument = new HashMap<>(3);
        //设置死信交换机
        argument.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信routingKey
        argument.put("x-dead-letter-routing-key","YD");
        //声明队列过期时间,党委为毫秒
        argument.put("x-message-ttl",10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(argument).build();
    }

    //声明普通队列A绑定普通交换机
    @Bean
    public Binding queueAbindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("Xexchange") DirectExchange Xexchange){
        return BindingBuilder.bind(queueA).to(Xexchange).with("XA");
    }

    //声明普通队列B,过期时间30s
    @Bean("queueB")
    public Queue queueB(){
        HashMap argument = new HashMap<>(3);
        //设置死信交换机
        argument.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
        //设置死信routingKey
        argument.put("x-dead-letter-routing-key","YD");
        //声明队列过期时间,党委为毫秒
        argument.put("x-message-ttl",30000);
        return QueueBuilder.durable(QUEUE_B).withArguments(argument).build();
    }

    //声明普通队列B绑定普通交换机
    @Bean
    public Binding queueBbindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("Xexchange") DirectExchange Xexchange){
        return BindingBuilder.bind(queueB).to(Xexchange).with("XB");
    }

    //声明死信队列
    @Bean("queueD")
    public Queue queueD(){
        return new Queue(DEAD_LETTER_QUEUE);
    }

    //声明死信队列绑定死信交换机
    @Bean
    public Binding queueDbindingY(@Qualifier("queueD") Queue queueD,
                                  @Qualifier("Yexchange") DirectExchange Yexchange){
        return BindingBuilder.bind(queueD).to(Yexchange).with("YD");
    }
}

编写生产者代码

我们要求访问 http://localhost:8080/ttl/sendMsg/xxx,最后的xxx为我们需要发送的信息

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("当前时间:{},生产者发送消息:{}",new Date().toString(),message);
        //像队列A和队列B分别发送消息
        rabbitTemplate.convertAndSend("X","XA","消息来源10s的队列A:" + message);
        rabbitTemplate.convertAndSend("X","XB","消息来源30s的队列B:" + message);
    }
}

编写消费者代码

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }
}

测试

启动服务,访问 http://localhost:8080/ttl/sendMsg/helloRabbitMQ ,查看控制台输出

可以看到控制台输出的间隔时间为10s和30s,这说明我们的案例成功了

延时队列优化

上述例子虽然实现了延时队列的效果,但是有那么一个问题,我们设置了两个队列10秒和30秒,当我们的需求新增一个60秒的队列怎么办?手动在增加一个吗?如果新增100个队列呢?所以我们就在想,能不能在配置文件中不设置过期时间,交给客户端来设置。以下QC队列就是我们需要实现的

更新配置文件

新增以下内容

//新增普通队列
public static String QUEUE_C = "QC";
//声明普通队列C
@Bean("queueC")
public Queue queueC(){
    HashMap argument = new HashMap<>(3);
    //设置死信交换机
    argument.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
    //设置死信routingKey
    argument.put("x-dead-letter-routing-key","YD");
    return QueueBuilder.durable(QUEUE_C).withArguments(argument).build();
}

更新生产者代码

新增以下内容

@GetMapping("/sendTTLMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
    log.info("当前时间:{},生产者发送消息:{},消息延迟时间为:{}",new Date().toString(),message,ttlTime);
    rabbitTemplate.convertAndSend("X","QC","消息来源队列C:" + message,correlationData -> {
        //设置过期时间
        correlationData.getMessageProperties().setExpiration(ttlTime);
        return correlationData;
    });
}

测试

发送以下两个请求测试

http://localhost:8080/ttl/sendTTLMsg/hello1/20000http://localhost:8080/ttl/sendTTLMsg/hello2/2000

我们发现是可以以进行通过一个队列发送两条不同的延时消息的

基于死信队列延时消息的问题

通过上面例子我们可以看到,虽然发送并接收到了两条延时消息,但是延时2秒的消息是在延时20秒的消息之后接收到的,这其实一个比较大的bug,因为队列是有序的,先进先出,所以即使你超时时间设置的再快,也得等前面的消息被处理了才能轮得到你。

这个问题需要使用插件才能解决

基于插件解决延时消息

安装插件

小黄的rabbitmq是部署在服务器上使用docker运行的,安装插件的过程可以参考 [Docker安装Rabbitmq及其延时队列插件]

安装完成之后,在web端我们可以看到延时消息类型的交换机

原理

基于死信队列的延时消息,我们是在消息进入队列的时候,设置消息的过期时间,而基于插件的延时消息,交换机本身就有一个延时的功能,等到时间到了在交给队列发送给消费者

编写配置类

@Configuration
public class DelayedQueueConfig {
    //声明延时交换机名称
    public static String DELAYED_EXCHANGE = "delayed_exchange";
    //声明延时routingkey
    public static String DELAYED_ROUTINGKEY = "delayed_routing_key";
    //声明延时队列名称
    public static String DELAYED_QUEUE = "delayed_queue";

    //声明队列
    @Bean
    public Queue delayedQueue(){
        return new Queue(DELAYED_QUEUE);
    }

    //声明交换机
    @Bean
    public CustomExchange delayedExchange(){
        HashMap arguments = new HashMap<>();
        //设置自定义交换机类型
        arguments.put("x-delayed-type","direct");
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    //绑定交换机和队列
    @Bean
    public Binding delayedQueueBindingDelayedExchange(
            @Qualifier("delayedQueue") Queue delayedQueue,
            @Qualifier("delayedExchange") CustomExchange delayedExchange
    ){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTINGKEY).noargs();
    }
}

编写生产者

@GetMapping("/sendDelayedMsg/{message}/{delayedTime}")
public void sendDelayedMsg(@PathVariable String message,@PathVariable Integer delayedTime){
    log.info("当前时间:{},生产者发送消息:{},消息延迟时间为:{}",new Date().toString(),message,delayedTime);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE,DelayedQueueConfig.DELAYED_ROUTINGKEY, message
                                  ,correlationData -> {
                                      correlationData.getMessageProperties().setDelay(delayedTime);
                                      return correlationData;
                                  });
}

编写消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
    }
}

测试

我们还是先发一个延时20s的内容,再发一个延时2s的内容

http://localhost:8080/ttl/sendDelayedMsg/hello1/20000http://localhost:8080/ttl/sendDelayedMsg/hello2/2000

观察控制台输出,可以发现与我们的预期效果一样,完美!!!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/711437.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号