- 定义注解
package com.kcsm.ipp.commons.annotation;
import com.kcsm.ipp.commons.config.RabbitMqBeanDefinitionRegister;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.context.annotation.import;
import java.lang.annotation.*;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@documented
@ConditionalOnClass(value = {ConnectionFactory.class,RabbitTemplate.class, RabbitAdmin.class})
@import(value = {RabbitMqBeanDefinitionRegister.class})
public @interface EnableMq {
}
- 实现动态注入
package com.kcsm.ipp.commons.config;
import com.kcsm.ipp.commons.enums.rabbitmq.DelayQueueEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.ExchangeEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.QueueEnum;
import com.kcsm.ipp.commons.service.impl.RabbitMqServiceImpl;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.importBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.Annotationmetadata;
import java.util.Map;
public class RabbitMqBeanDefinitionRegister implements importBeanDefinitionRegistrar, EnvironmentAware {
private String host;
private Integer port;
private String virtualHost;
private String username;
private String password;
@Override
public void setEnvironment(Environment environment) {
try {
this.host = environment.getProperty("spring.rabbitmq.host");
this.port = Integer.valueOf(environment.getProperty("spring.rabbitmq.port"));
this.virtualHost = environment.getProperty("spring.rabbitmq.virtual-host");
this.username = environment.getProperty("spring.rabbitmq.username");
this.password = environment.getProperty("spring.rabbitmq.password");
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
@Override
public void registerBeanDefinitions(Annotationmetadata importingClassmetadata, BeanDefinitionRegistry registry) {
//注入CachingConnectionFactory
BeanDefinition connectionFactoryBeanDefinition = BeanDefinitionBuilder.genericBeanDefinition(CachingConnectionFactory.class,()->{
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setVirtualHost(virtualHost);
return cachingConnectionFactory;
}).getBeanDefinition();
registry.registerBeanDefinition(CachingConnectionFactory.class.getName(), connectionFactoryBeanDefinition);
//注入RabbitAdmin
BeanDefinition rabbitAdminBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(RabbitAdmin.class)
.addConstructorArgReference(CachingConnectionFactory.class.getName())
.addPropertyValue("autoStartup",true)
.getBeanDefinition();
registry.registerBeanDefinition(RabbitAdmin.class.getName(), rabbitAdminBeanDefinition);
//注入RabbitTemplate
BeanDefinition rabbitTemplateBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(RabbitTemplate.class)
.addConstructorArgReference(CachingConnectionFactory.class.getName())
.getBeanDefinition();
registry.registerBeanDefinition(RabbitTemplate.class.getName(), rabbitTemplateBeanDefinition);
//注入RabbitMqServiceImpl
BeanDefinition rabbitMqServiceImplBeanDefinition = BeanDefinitionBuilder.rootBeanDefinition(RabbitMqServiceImpl.class)
.addConstructorArgReference(RabbitTemplate.class.getName())
.addConstructorArgReference(RabbitAdmin.class.getName())
.getBeanDefinition();
registry.registerBeanDefinition(RabbitMqServiceImpl.class.getName(), rabbitMqServiceImplBeanDefinition);
//注入交换机
for (ExchangeEnum value : ExchangeEnum.values()) {
switch (value.getType()){
case topic:
BeanDefinition topicExchange = BeanDefinitionBuilder.genericBeanDefinition(TopicExchange.class, () -> new TopicExchange(value.getName(), value.getDurable(), false)).getBeanDefinition();
registry.registerBeanDefinition(value.getName(),topicExchange);
break;
case direct:
BeanDefinition directExchange = BeanDefinitionBuilder.genericBeanDefinition(DirectExchange.class, () -> new DirectExchange(value.getName(), value.getDurable(), false)).getBeanDefinition();
registry.registerBeanDefinition(value.getName(),directExchange);
break;
case fanout:
BeanDefinition fanoutExchange = BeanDefinitionBuilder.genericBeanDefinition(FanoutExchange.class, () -> new FanoutExchange(value.getName(), value.getDurable(), false)).getBeanDefinition();
registry.registerBeanDefinition(value.getName(),fanoutExchange);
break;
default:
break;
}
}
for (QueueEnum value : QueueEnum.values()) {
BeanDefinition queue = BeanDefinitionBuilder.genericBeanDefinition(Queue.class,()->new Queue(value.getName())).getBeanDefinition();
registry.registerBeanDefinition(value.getName(),queue);
}
for (DelayQueueEnum value : DelayQueueEnum.values()) {
Map params = DelayQueueEnum.params(value.getDeadExchangeEnum().getName(), value.getDeadQueueEnum().getName(), value.getTtl());
BeanDefinition queue = BeanDefinitionBuilder.genericBeanDefinition(Queue.class,()->QueueBuilder.durable(value.getName()).withArguments(params).build()).getBeanDefinition();
registry.registerBeanDefinition(value.getName(),queue);
}
}
}
- 定义交换机,队列,延迟队列
package com.kcsm.ipp.commons.enums.rabbitmq;
public enum ExchangeEnum {
DEFAULT_EXCHANGE("default_exchange",ExchangeType.direct,true,"默认交换机"),
DEFAULT_DELAY_EXCHANGE("default_delay_exchange",ExchangeType.direct,true,"默认延迟交换机"),
DEFAULT_DEAD_EXCHANGE("default_dead_exchange",ExchangeType.direct,true,"默认死信交换机")
;
//交换机名称
private String name;
//交换机类型
private ExchangeType type;
//true持久化 交换机 消息在服务重启后存在
private Boolean durable;
//长时间不使用交换机系统自动删除
// private Boolean autoDelete;
//描述
private String desc;
ExchangeEnum(String name, ExchangeType type, Boolean durable, String desc) {
this.name = name;
this.type = type;
this.durable = durable;
this.desc = desc;
}
public String getName() {
return name;
}
public ExchangeType getType() {
return type;
}
public Boolean getDurable() {
return durable;
}
public String getDesc() {
return desc;
}
}
package com.kcsm.ipp.commons.enums.rabbitmq;
public enum ExchangeType {
direct,
fanout,
topic
}
package com.kcsm.ipp.commons.enums.rabbitmq;
import lombok.Getter;
import java.util.Map;
@Getter
public enum QueueEnum {
DEFAULT_QUEUE("default_queue",true,null,"默认队列"),
DEFAULT_DEAD_QUEUE("default_dead_queue",true,null,"默认死信队列"),
SERVICE_NOTIFY_DEAD_QUEUE("service_notify_dead_queue",true,null,"业务回调死信队列"),
;
//队列名称
private String name;
//持久换
private Boolean durable;
//参数
private Map arguments;
//队列描述
private String desc;
QueueEnum(String name, Boolean durable, Map arguments, String desc) {
this.name = name;
this.durable = durable;
this.arguments = arguments;
this.desc = desc;
}
}
package com.kcsm.ipp.commons.enums.rabbitmq;
import lombok.Getter;
import java.util.HashMap;
import java.util.Map;
@Getter
public enum DelayQueueEnum {
DEFAULT_DELAY_QUEUE("default_delay_queue",true,5000L,ExchangeEnum.DEFAULT_DEAD_EXCHANGE,QueueEnum.DEFAULT_DEAD_QUEUE,"默认延迟队列"),
SERVICE_NOTIFY_DELAY_QUEUE_15000("service_notify_delay_queue_15000",true,15000L,ExchangeEnum.DEFAULT_DEAD_EXCHANGE,QueueEnum.PAYMENT_SERVICE_NOTIFY_DEAD_QUEUE,"业务回调延迟队列"),
SERVICE_NOTIFY_DELAY_QUEUE_30000("service_notify_delay_queue_30000",true,30000L,ExchangeEnum.DEFAULT_DEAD_EXCHANGE,QueueEnum.PAYMENT_SERVICE_NOTIFY_DEAD_QUEUE,"业务回调延迟队列"),
;
//队列名称
private String name;
//持久换
private Boolean durable;
//延迟时间ms
private Long ttl;
//监听交换机
private ExchangeEnum deadExchangeEnum;
//监听队列
private QueueEnum deadQueueEnum;
//队列描述
private String desc;
DelayQueueEnum(String name, Boolean durable, Long ttl, ExchangeEnum deadExchangeEnum, QueueEnum deadQueueEnum, String desc) {
this.name = name;
this.durable = durable;
this.ttl = ttl;
this.deadExchangeEnum = deadExchangeEnum;
this.deadQueueEnum = deadQueueEnum;
this.desc = desc;
}
public static Map params(String deadExchange,String deadQueue,Long ttl){
// reply_to 队列
Map map = new HashMap<>();
//设置消息的过期时间 单位毫秒
map.put("x-message-ttl",ttl);
//设置附带的死信交换机
map.put("x-dead-letter-exchange",deadExchange);
//指定重定向的路由建 消息作废之后可以决定需不需要更改他的路由建 如果需要 就在这里指定
map.put("x-dead-letter-routing-key",deadQueue);
return map;
}
}
- 定义统一发送消息service
package com.kcsm.ipp.commons.service;
import com.kcsm.ipp.commons.enums.rabbitmq.DelayQueueEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.ExchangeEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.QueueEnum;
public interface MqService {
void send(Object msg, QueueEnum queueEnum) throws Exception;
void send(Object msg, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception;
void sendDelay(Object msg, DelayQueueEnum delayQueueEnum) throws Exception;
void sendDelay(Object msg, ExchangeEnum delayExchangeEnum,DelayQueueEnum delayQueueEnum) throws Exception;
}
package com.kcsm.ipp.commons.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kcsm.ipp.commons.enums.rabbitmq.DelayQueueEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.ExchangeEnum;
import com.kcsm.ipp.commons.enums.rabbitmq.QueueEnum;
import com.kcsm.ipp.commons.service.MqService;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.util.Map;
public class RabbitMqServiceImpl implements MqService {
private RabbitTemplate rabbitTemplate;
private RabbitAdmin rabbitAdmin;
public RabbitMqServiceImpl(RabbitTemplate rabbitTemplate, RabbitAdmin rabbitAdmin) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitAdmin = rabbitAdmin;
}
@Override
public void send(Object msg, QueueEnum queueEnum) throws Exception{
ExchangeEnum exchangeEnum = ExchangeEnum.DEFAULT_EXCHANGE;
this.send(msg,exchangeEnum,queueEnum);
}
@Override
public void send(Object msg, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception{
this.bindExchangeAndQueue(exchangeEnum,queueEnum);
Message message = MessageBuilder.withBody(new ObjectMapper().writevalueAsBytes(msg)).build();
rabbitTemplate.convertAndSend(exchangeEnum.getName(),queueEnum.getName(),message);
}
@Override
public void sendDelay(Object msg, DelayQueueEnum delayQueueEnum) throws Exception {
ExchangeEnum exchangeEnum = ExchangeEnum.DEFAULT_EXCHANGE;
this.sendDelay(msg,exchangeEnum,delayQueueEnum);
}
@Override
public void sendDelay(Object msg, ExchangeEnum delayExchangeEnum, DelayQueueEnum delayQueueEnum) throws Exception {
//绑定死信
this.bindExchangeAndQueue(delayQueueEnum.getDeadExchangeEnum(),delayQueueEnum.getDeadQueueEnum());
//绑定延迟
this.bindDelayExchangeAndQueue(delayExchangeEnum,delayQueueEnum);
Message message = MessageBuilder.withBody(new ObjectMapper().writevalueAsBytes(msg)).build();
rabbitTemplate.convertAndSend(delayExchangeEnum.getName(),delayQueueEnum.getName(),message);
}
private void createMyExchange(ExchangeEnum exchangeEnum) throws Exception{
switch (exchangeEnum.getType()){
case direct:
rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getName(),exchangeEnum.getDurable(),false));
break;
case topic:
rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getName(),exchangeEnum.getDurable(),false));
break;
case fanout:
rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getName(),exchangeEnum.getDurable(),false));
break;
default:
throw new Exception("请指定交换机类型");
}
}
private void createMyQueue(QueueEnum queueEnum) throws Exception{
if (null != queueEnum.getArguments() && queueEnum.getArguments().size() > 0){
rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),queueEnum.getDurable(),false,false,queueEnum.getArguments()));
return;
}
rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),queueEnum.getDurable()));
}
private void createMyDelayQueue(DelayQueueEnum delayQueueEnum) throws Exception{
Map params = DelayQueueEnum.params(delayQueueEnum.getDeadExchangeEnum().getName(), delayQueueEnum.getDeadQueueEnum().getName(), delayQueueEnum.getTtl());
Queue queue = QueueBuilder.durable(delayQueueEnum.getName()).withArguments(params).build();
rabbitAdmin.declareQueue(queue);
}
private void bindExchangeAndQueue(ExchangeEnum exchangeEnum,QueueEnum queueEnum) {
rabbitAdmin.declareBinding(new Binding(
queueEnum.getName(),
Binding.DestinationType.QUEUE,
exchangeEnum.getName(),
queueEnum.getName(),
null
));
}
private void bindDelayExchangeAndQueue(ExchangeEnum exchangeEnum,DelayQueueEnum delayQueueEnum) {
rabbitAdmin.declareBinding(new Binding(
delayQueueEnum.getName(),
Binding.DestinationType.QUEUE,
exchangeEnum.getName(),
delayQueueEnum.getName(),
null
));
}
}
- demo
发送消息
mqService.send(msg);



