栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

springboot+rabbitmq做一个延时队列

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

springboot+rabbitmq做一个延时队列

相关常量:

public static String DELAY_QUEUE="delayqueue";
public static String IMMEDIATE_QUEUE="immediateqqueue";
public static String IMMEDIATE_EXCHANGE="immediatequeue";
public static String IMMEDIATE_ROUTING_KEY="immediaterouring";
public static String DELAY_ROUTING_KEY="delayrouting";
public static String DEAD_LETTER_EXCHANGE="dead_letter_exchange";

rabbitmq配置类

package com.csrcb.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


@Configuration
public class RabbitMqConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    public static final String EXCHANGE_A = "my-mq-direct_exchange";
    public static final String EXCHANGE_B = "my-mq-exchange_B";
    public static final String EXCHANGE_C = "my-mq-exchange_C";


    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_A_FAIL = "QUEUE_A_FAIL";
    public static final String QUEUE_B = "QUEUE_B";

    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_A_FAIL = "spring-boot-routingKey_A_FAIL";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";

    //建立一个连接容器,类型数据库的连接池
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);//确认机制
//        connectionFactory.setPublisherReturns(true);
        //发布确认,template要求CachingConnectionFactory的publisher/confirm/is属性设置为true
        return connectionFactory;
    }

    //RabbitMQ的使用入口
    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(this.connectionFactory());
        template.setMessageConverter(this.jsonMessageConverter());
        template.setMandatory(true);
        return template;
    }


    //把交换机,队列,通过路由关键字进行绑定,写在RabbitConfig类当中
    
    @Bean
    public DirectExchange testDirectExchange() {
        return new DirectExchange(EXCHANGE_A);
    }
    @Bean
    public DirectExchange testDirectExchangeB() {
        return new DirectExchange(EXCHANGE_B);
    }
    @Bean
    public DirectExchange testDirectExchangeC() {
        return new DirectExchange(QUEUE_B);
    }

    
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //队列持久
    }
    @Bean
    public Queue queueB() {
        return new Queue(QUEUE_B, true); //队列持久
    }

    @Bean
    public Binding bindingB() {
        return BindingBuilder.bind(queueB()).to(testDirectExchangeB()).with(ROUTINGKEY_B);
    }

    @Bean
    public Queue queueAFail(){
        return new Queue(QUEUE_A_FAIL, true);//队列持久
    }

    //将队列和交换机绑定, 并设置用于匹配键:spring-boot-routingKey_A
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(queueA()).to(testDirectExchange()).with(ROUTINGKEY_A);
    }

    @Bean
    public Binding bindingAFail() {
        return BindingBuilder.bind(queueAFail()).to(testDirectExchange()).with(RabbitMqConfig.ROUTINGKEY_A_FAIL);
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    //一个交换机可以绑定多个消息队列,也就是消息通过一个交换机,可以分发到不同的队列当中去。
//    
//    @Bean
//    public Queue queueB() {
//        return new Queue(QUEUE_B, true); //队列持久
//    }

//    @Bean
//    public Binding bindingB(){
//        return BindingBuilder.bind(queueB()).to(defaultExchange()).with(ROUTINGKEY_B);
//    }

//    @Bean
//    public CharacterEncodingFilter characterEncodingFilter() {
//        CharacterEncodingFilter filter = new CharacterEncodingFilter();
//        filter.setEncoding("UTF-8");
//        filter.setForceEncoding(true);
//        return filter;
//    }
}

 

package com.csrcb.config;

import java.util.HashMap;
import java.util.Map;

import com.csrcb.common.Constants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



@Configuration
public class delayqueueConfig {

    // 创建一个立即消费队列
    @Bean
    public Queue immediateQueue() {
        // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
        return new Queue(Constants.IMMEDIATE_QUEUE, true);
    }

    // 创建一个延时队列
    @Bean
    public Queue delayQueue() {
        Map params = new HashMap<>();
        // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
        params.put("x-dead-letter-exchange", Constants.IMMEDIATE_EXCHANGE);
        // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", Constants.IMMEDIATE_ROUTING_KEY);
        return new Queue(Constants.DELAY_QUEUE, true, false, false, params);
    }

    @Bean
    public DirectExchange immediateExchange() {
        // 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
        //第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
        return new DirectExchange(Constants.IMMEDIATE_EXCHANGE, true, false);
    }

    @Bean
    public DirectExchange deadLetterExchange() {
        // 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,
        //第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数
        return new DirectExchange(Constants.DEAD_LETTER_EXCHANGE, true, false);
    }

    @Bean
    //把立即消费的队列和立即消费的exchange绑定在一起
    public Binding immediateBinding() {
        return BindingBuilder.bind(immediateQueue()).to(immediateExchange()).with(Constants.IMMEDIATE_ROUTING_KEY);
    }

    @Bean
    //把立即消费的队列和立即消费的exchange绑定在一起
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(Constants.DELAY_ROUTING_KEY);
    }
}

生产者controller

@PostMapping("/senddeadqueue")
public void sendMessagefanout11111() {
  this.queueMessageService.senddelayqueue("fa song si xin dui lie",20000l);
}
public void senddelayqueue(String user,Long time) {
    log.info("消息已经发送,时间为:{}",new Timestamp(System.currentTimeMillis()));
    String callBackId = UUID.randomUUID().toString();
    CorrelationData correlationId = new CorrelationData(callBackId);
    this.rabbitTemplate.convertAndSend(
            Constants.DEAD_LETTER_EXCHANGE,
            // routingKey
            Constants.DELAY_ROUTING_KEY,
            user,
            message -> {
                // 设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(time));
                return message;




            },correlationId);
}



@Override
public void confirm(CorrelationData correlationData, boolean ack, String s) {
    log.info(" 回调id:" + correlationData.getId());
    if (ack) {
        log.info("消息发送成功");
    } else {
        log.info("消息发送失败:" + s);
    }
}

消费者消费

@RabbitListener(queues = "immediateqqueue")
@RabbitHandler
public void consumeMessagedelaymessage(Message message){
    log.info("收到的消息fouantC:{}",message);
}

pom.xml


    org.springframework.boot
    spring-boot-starter-amqp
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/295075.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号