栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

rabbitmq安装延时队列插件实现延时队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

rabbitmq安装延时队列插件实现延时队列

下载插件地址

要注意和自己的rabbitmq的版本对应起来
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

我的mq是docker安装的3.9.7的

下载完之后把插件copy到mq的plugin目录下,然后启用插件。之后重启容器,我这里是docker-compose安装的

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker-compose restart


进入rabbitmq管理页面查看插件是否安装成功

在Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

使用mq延时队列插件下springboot实现延时队列

yaml配置mq,然后在mq管理页面创建虚拟host:fchan

spring:
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan

配置延时队列和延时交换机的绑定

package com.fchan.mq.mqDelay;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqDelayConfig {

    //最后经过死信队列转发后实际消费的交换机
    private static final String EXCHANGE_NAME = "delayed_exchange";
    //最后经过死信队列转发后实际消费的队列
    private static final String QUEUE_NAME = "delayed_queue";
    //最后经过死信队列转发后实际消费的路由key
    private static final String ROUTE_KEY = "delayed_key";

    
    @Bean
    CustomExchange exchange() {
        //通过x-delayed-type参数设置fanout /direct / topic / header 类型
        Map args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_NAME, "x-delayed-message",true, false,args);
    }

    
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME,true,false,false);
    }

    
    @Bean
    public Binding binding(CustomExchange exchange, Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTE_KEY)
                .noargs();
    }
}

消息生产者

package com.fchan.mq.mqDelay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class MyRabbitSender {

    Logger log = LoggerFactory.getLogger(MyRabbitSender.class);

    private static final String ROUTE_KEY = "delayed_key";
    private static final String EXCHANGE_NAME = "delayed_exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    public void send2(String msg, int delay) {
        log.info("RabbitSender.send() msg = {}", msg);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  //消息持久化
            message.getMessageProperties().setDelay(delay * 1000);   // 单位为毫秒
            return message;
        });
    }
}

消息消费者

package com.fchan.mq.mqDelay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyRabbitConsume {
    Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);

    @RabbitListener(queues = "delayed_queue")
    public void infoConsumption(String data) throws Exception {
        log.info("收到信息:{}",data);
        log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
    }
}

参考了大佬的博文
https://juejin.cn/post/6977516798828609567#heading-13

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/332319.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号