栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ使用插件实现延迟消息,常见问题解决方案

RabbitMQ使用插件实现延迟消息,常见问题解决方案

文章目录

安装插件代码

注册交换机和Queue消息发送者消息消费者 常见问题

安装插件
    下载rabbitmq_delayed_message_exchange插件地址(尽量与rabbitMQ版本一致):https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases将 rabbitmq_delayed_message_exchange 插件拷贝到 rabbitMQ 安装目录下的 plugins 目录中进入RabbitMQ安装目录中的sbin目录,启动插件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    验证是否安装成功,如果存在下图所示的交换机类型,则表示安装成功。
代码 注册交换机和Queue
package com.wcong.concise.amqp;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class RabbitMqConfiguration {

    
    public static final String QUEUE_NAME = "delay_queue_test";
    
    public static final String EXCHANGE_NAME = "delay_exchange_test";

    
    @Bean
    public CustomExchange customExchange() {
        Map args = new HashMap<>(2);
        // 定义交换机分发消息类型,direct、fanout、topic、header
        args.put("x-delayed-type", "direct");
        
        return new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    
    @Bean
    public Queue queue() {
        
        return new Queue(QUEUE_NAME, true, false, false);
    }

    
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(customExchange()).with(QUEUE_NAME).noargs();
    }

}

消息发送者
package com.wcong.concise.test;

import com.wcong.concise.amqp.RabbitMqConfiguration;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

import java.io.UnsupportedEncodingException;
import java.time.LocalDateTime;


@SpringBootTest
@RequiredArgsConstructor
@Slf4j
public class DelayMsgApplicationTests{
    
    private final RabbitTemplate rabbitTemplate;
    @Test
    public void contextLoads() throws UnsupportedEncodingException {
        // 模拟消息
        String msg = "this is msg: " + LocalDateTime.now();
        
        rabbitTemplate.convertAndSend(RabbitMqConfiguration.EXCHANGE_NAME, RabbitMqConfiguration.QUEUE_NAME, msg, messagePostProcessor -> {
            // 延迟10s
            long delayTime = 10 * 1000L;
            messagePostProcessor.getMessageProperties().setDelay(Math.toIntExact(delayTime));
            return messagePostProcessor;
        });
        
        log.info(">>> msg send success!");
    }
}

消息消费者
package com.wcong.concise.amqp;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
@Slf4j
public class MsgReceiverTest{
    
    @RabbitListener(queues = RabbitMqConfiguration.QUEUE_NAME)
    public void handleMsg(String msg){
        log.info(">>> 接收到消息: {}", msg);
    }
    
}
常见问题

    检查交换机是否为具备延迟属性,即 Type 为 x-delayed-message

    发送延迟消息没有被消费:检查队列是否被路由到了交换机:

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/753107.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号