栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMq工具类

RabbitMq工具类

为了解决rabbit繁琐的配置交换机,队列,及队列绑定。实现了动态注入交换机及队列,实现了公共方法统一消息队列发送消息方法
    定义注解
package com.kcsm.ipp.commons.annotation;

import com.kcsm.ipp.commons.config.RabbitMqBeanDefinitionRegister;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.import;

import java.lang.annotation.*;


@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@documented
@ConditionalOnClass(value = {ConnectionFactory.class,RabbitTemplate.class, RabbitAdmin.class})
@import(value = {RabbitMqBeanDefinitionRegister.class})
public @interface EnableMq {
}

    实现动态注入
package com.kcsm.ipp.commons.config;

import com.kcsm.ipp.commons.enums.rabbitmq.DelayQueueEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.ExchangeEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.QueueEnum;
import com.kcsm.ipp.commons.service.impl.RabbitMqServiceImpl;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.importBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.Annotationmetadata;

import java.util.Map;


public class RabbitMqBeanDefinitionRegister implements importBeanDefinitionRegistrar, EnvironmentAware {

    private String host;
    private Integer port;
    private String virtualHost;
    private String username;
    private String password;

    @Override
    public void setEnvironment(Environment environment) {
        try {
            this.host = environment.getProperty("spring.rabbitmq.host");
            this.port = Integer.valueOf(environment.getProperty("spring.rabbitmq.port"));
            this.virtualHost = environment.getProperty("spring.rabbitmq.virtual-host");
            this.username = environment.getProperty("spring.rabbitmq.username");
            this.password = environment.getProperty("spring.rabbitmq.password");
        } catch (NumberFormatException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void registerBeanDefinitions(Annotationmetadata importingClassmetadata, BeanDefinitionRegistry registry) {

        //注入CachingConnectionFactory
        BeanDefinition connectionFactoryBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class,()->{
            CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
            cachingConnectionFactory.setHost(host);
            cachingConnectionFactory.setPort(port);
            cachingConnectionFactory.setUsername(username);
            cachingConnectionFactory.setPassword(password);
            cachingConnectionFactory.setVirtualHost(virtualHost);
            return cachingConnectionFactory;
        }).getBeanDefinition();
        registry.registerBeanDefinition(CachingConnectionFactory.class.getName(), connectionFactoryBeanDefinition);

        //注入RabbitAdmin
        BeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(RabbitAdmin.class)
                .addConstructorArgReference(CachingConnectionFactory.class.getName())
                .addPropertyValue("autoStartup",true)
                .getBeanDefinition();
        registry.registerBeanDefinition(RabbitAdmin.class.getName(), rabbitAdminBeanDefinition);

        //注入RabbitTemplate
        BeanDefinition rabbitTemplateBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(RabbitTemplate.class)
                .addConstructorArgReference(CachingConnectionFactory.class.getName())
                .getBeanDefinition();
        registry.registerBeanDefinition(RabbitTemplate.class.getName(), rabbitTemplateBeanDefinition);

        //注入RabbitMqServiceImpl
        BeanDefinition rabbitMqServiceImplBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(RabbitMqServiceImpl.class)
                .addConstructorArgReference(RabbitTemplate.class.getName())
                .addConstructorArgReference(RabbitAdmin.class.getName())
                .getBeanDefinition();
        registry.registerBeanDefinition(RabbitMqServiceImpl.class.getName(), rabbitMqServiceImplBeanDefinition);

        //注入交换机
        for (ExchangeEnum value : ExchangeEnum.values()) {
            switch (value.getType()){
                case topic:
                    BeanDefinition topicExchange = BeanDefinitionBuilder.genericBeanDefinition(TopicExchange.class, () -> new TopicExchange(value.getName(), value.getDurable(), false)).getBeanDefinition();
                    registry.registerBeanDefinition(value.getName(),topicExchange);
                    break;
                case direct:
                    BeanDefinition directExchange = BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class, () -> new DirectExchange(value.getName(), value.getDurable(), false)).getBeanDefinition();
                    registry.registerBeanDefinition(value.getName(),directExchange);
                    break;
                case fanout:
                    BeanDefinition fanoutExchange = BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () -> new FanoutExchange(value.getName(), value.getDurable(), false)).getBeanDefinition();
                    registry.registerBeanDefinition(value.getName(),fanoutExchange);
                    break;
                default:
                    break;
            }
        }

        for (QueueEnum value : QueueEnum.values()) {
            BeanDefinition queue = BeanDefinitionBuilder.genericBeanDefinition(Queue.class,()->new Queue(value.getName())).getBeanDefinition();
            registry.registerBeanDefinition(value.getName(),queue);
        }

        for (DelayQueueEnum value : DelayQueueEnum.values()) {
            Map params = DelayQueueEnum.params(value.getDeadExchangeEnum().getName(), value.getDeadQueueEnum().getName(), value.getTtl());
            BeanDefinition queue = BeanDefinitionBuilder.genericBeanDefinition(Queue.class,()->QueueBuilder.durable(value.getName()).withArguments(params).build()).getBeanDefinition();
            registry.registerBeanDefinition(value.getName(),queue);
        }
    }

}

    定义交换机,队列,延迟队列
package com.kcsm.ipp.commons.enums.rabbitmq;


public enum  ExchangeEnum {
    DEFAULT_EXCHANGE("default_exchange",ExchangeType.direct,true,"默认交换机"),
    DEFAULT_DELAY_EXCHANGE("default_delay_exchange",ExchangeType.direct,true,"默认延迟交换机"),
    DEFAULT_DEAD_EXCHANGE("default_dead_exchange",ExchangeType.direct,true,"默认死信交换机")
    ;
    //交换机名称
    private String name;
    //交换机类型
    private ExchangeType type;
    //true持久化 交换机 消息在服务重启后存在
    private Boolean durable;
    //长时间不使用交换机系统自动删除
//        private Boolean autoDelete;
    //描述
    private String desc;

    ExchangeEnum(String name, ExchangeType type, Boolean durable, String desc) {
        this.name = name;
        this.type = type;
        this.durable = durable;
        this.desc = desc;
    }

    public String getName() {
        return name;
    }

    public ExchangeType getType() {
        return type;
    }

    public Boolean getDurable() {
        return durable;
    }

    public String getDesc() {
        return desc;
    }
}

package com.kcsm.ipp.commons.enums.rabbitmq;


public enum  ExchangeType {
    direct,
    fanout,
    topic
}

package com.kcsm.ipp.commons.enums.rabbitmq;

import lombok.Getter;

import java.util.Map;


@Getter
public enum QueueEnum {
    DEFAULT_QUEUE("default_queue",true,null,"默认队列"),

    DEFAULT_DEAD_QUEUE("default_dead_queue",true,null,"默认死信队列"),

    SERVICE_NOTIFY_DEAD_QUEUE("service_notify_dead_queue",true,null,"业务回调死信队列"),
    ;
    //队列名称
    private String name;
    //持久换
    private Boolean durable;
    //参数
    private Map arguments;
    //队列描述
    private String desc;

    QueueEnum(String name, Boolean durable, Map arguments, String desc) {
        this.name = name;
        this.durable = durable;
        this.arguments = arguments;
        this.desc = desc;
    }
}
package com.kcsm.ipp.commons.enums.rabbitmq;

import lombok.Getter;

import java.util.HashMap;
import java.util.Map;


@Getter
public enum DelayQueueEnum {
    DEFAULT_DELAY_QUEUE("default_delay_queue",true,5000L,ExchangeEnum.DEFAULT_DEAD_EXCHANGE,QueueEnum.DEFAULT_DEAD_QUEUE,"默认延迟队列"),

    SERVICE_NOTIFY_DELAY_QUEUE_15000("service_notify_delay_queue_15000",true,15000L,ExchangeEnum.DEFAULT_DEAD_EXCHANGE,QueueEnum.PAYMENT_SERVICE_NOTIFY_DEAD_QUEUE,"业务回调延迟队列"),
    SERVICE_NOTIFY_DELAY_QUEUE_30000("service_notify_delay_queue_30000",true,30000L,ExchangeEnum.DEFAULT_DEAD_EXCHANGE,QueueEnum.PAYMENT_SERVICE_NOTIFY_DEAD_QUEUE,"业务回调延迟队列"),
    ;
    //队列名称
    private String name;
    //持久换
    private Boolean durable;
    //延迟时间ms
    private Long ttl;
    //监听交换机
    private ExchangeEnum deadExchangeEnum;
    //监听队列
    private QueueEnum deadQueueEnum;

    //队列描述
    private String desc;


    DelayQueueEnum(String name, Boolean durable, Long ttl, ExchangeEnum deadExchangeEnum, QueueEnum deadQueueEnum, String desc) {
        this.name = name;
        this.durable = durable;
        this.ttl = ttl;
        this.deadExchangeEnum = deadExchangeEnum;
        this.deadQueueEnum = deadQueueEnum;
        this.desc = desc;
    }



    
    public static Map params(String deadExchange,String deadQueue,Long ttl){
        // reply_to 队列
        Map map = new HashMap<>();
        //设置消息的过期时间 单位毫秒
        map.put("x-message-ttl",ttl);
        //设置附带的死信交换机
        map.put("x-dead-letter-exchange",deadExchange);
        //指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
        map.put("x-dead-letter-routing-key",deadQueue);
        return map;
    }



}

    定义统一发送消息service
package com.kcsm.ipp.commons.service;

import com.kcsm.ipp.commons.enums.rabbitmq.DelayQueueEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.ExchangeEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.QueueEnum;


public interface MqService {

    
    void send(Object msg, QueueEnum queueEnum) throws Exception;


    
    void send(Object msg, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception;


    
    void sendDelay(Object msg, DelayQueueEnum delayQueueEnum) throws Exception;

    
    void sendDelay(Object msg, ExchangeEnum delayExchangeEnum,DelayQueueEnum delayQueueEnum) throws Exception;
}

package com.kcsm.ipp.commons.service.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.kcsm.ipp.commons.enums.rabbitmq.DelayQueueEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.ExchangeEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.QueueEnum;
import com.kcsm.ipp.commons.service.MqService;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import java.util.Map;


public class RabbitMqServiceImpl implements MqService {

    private RabbitTemplate rabbitTemplate;
    private RabbitAdmin rabbitAdmin;

    public RabbitMqServiceImpl(RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin) {
        this.rabbitTemplate = rabbitTemplate;
        this.rabbitAdmin = rabbitAdmin;
    }

    @Override
    public void send(Object msg, QueueEnum queueEnum) throws Exception{
        ExchangeEnum exchangeEnum = ExchangeEnum.DEFAULT_EXCHANGE;
        this.send(msg,exchangeEnum,queueEnum);
    }

    @Override
    public void send(Object msg, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception{
        this.bindExchangeAndQueue(exchangeEnum,queueEnum);
        Message message = MessageBuilder.withBody(new ObjectMapper().writevalueAsBytes(msg)).build();
        rabbitTemplate.convertAndSend(exchangeEnum.getName(),queueEnum.getName(),message);
    }

    @Override
    public void sendDelay(Object msg, DelayQueueEnum delayQueueEnum) throws Exception {
        ExchangeEnum exchangeEnum = ExchangeEnum.DEFAULT_EXCHANGE;
        this.sendDelay(msg,exchangeEnum,delayQueueEnum);
    }

    @Override
    public void sendDelay(Object msg, ExchangeEnum delayExchangeEnum, DelayQueueEnum delayQueueEnum) throws Exception {
        //绑定死信
        this.bindExchangeAndQueue(delayQueueEnum.getDeadExchangeEnum(),delayQueueEnum.getDeadQueueEnum());
        //绑定延迟
        this.bindDelayExchangeAndQueue(delayExchangeEnum,delayQueueEnum);

        Message message = MessageBuilder.withBody(new ObjectMapper().writevalueAsBytes(msg)).build();
        rabbitTemplate.convertAndSend(delayExchangeEnum.getName(),delayQueueEnum.getName(),message);

    }

    
    private void createMyExchange(ExchangeEnum exchangeEnum) throws Exception{
        switch (exchangeEnum.getType()){
            case direct:
                rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getName(),exchangeEnum.getDurable(),false));
                break;
            case topic:
                rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getName(),exchangeEnum.getDurable(),false));
                break;
            case fanout:
                rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getName(),exchangeEnum.getDurable(),false));
                break;
            default:
                throw new Exception("请指定交换机类型");
        }

    }

    
    private void createMyQueue(QueueEnum queueEnum) throws Exception{
        if (null != queueEnum.getArguments() && queueEnum.getArguments().size() > 0){
            rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),queueEnum.getDurable(),false,false,queueEnum.getArguments()));
            return;
        }
        rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),queueEnum.getDurable()));
    }

    
    private void createMyDelayQueue(DelayQueueEnum delayQueueEnum) throws Exception{
        Map params = DelayQueueEnum.params(delayQueueEnum.getDeadExchangeEnum().getName(), delayQueueEnum.getDeadQueueEnum().getName(), delayQueueEnum.getTtl());
        Queue queue = QueueBuilder.durable(delayQueueEnum.getName()).withArguments(params).build();
        rabbitAdmin.declareQueue(queue);
    }

    
    private void bindExchangeAndQueue(ExchangeEnum exchangeEnum,QueueEnum queueEnum) {

        rabbitAdmin.declareBinding(new Binding(
                queueEnum.getName(),
                Binding.DestinationType.QUEUE,
                exchangeEnum.getName(),
                queueEnum.getName(),
                null
                ));
    }


    
    private void bindDelayExchangeAndQueue(ExchangeEnum exchangeEnum,DelayQueueEnum delayQueueEnum) {

        rabbitAdmin.declareBinding(new Binding(
                delayQueueEnum.getName(),
                Binding.DestinationType.QUEUE,
                exchangeEnum.getName(),
                delayQueueEnum.getName(),
                null
        ));
    }




}

    demo
    发送消息
mqService.send(msg);
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751334.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号