在软件项目开发中,基本上都是多人共同开发,尤其是大型项目。而消息队列更是大多数业务都会使用,前期基本上都是在业务中直接注入RabbitTemplate,创建各自的交换机、队列、监听者完成业务开发,但是到了项目后期,大量的交换机和队列的定义代码充斥在各个模块,每次需要回溯业务逻辑的时候都在不停的翻找代码,寻找监听者。
致命的是,通常我们找到了发消息的代码,由于通过RabbitTemplate发送消息是直接发给交换机的,想要找到处理消息的消费者业务代码,还需要通过交换机定义代码——交换机队列绑定——队列——监听者,这样繁琐的操作。同样的当你找了消息消费者的代码却不知道都有项目哪块对该队列发送了消息,如此总总浪费了开发人员大量的时间。
于是笔者为了消灭这种情况决定对RabbitMQ操作进行封装,目的是实现以下功能:
1、代码统一,交换机和队列复用,消灭项目定义交换机及队列的代码。
2、使用简单,像使用工具类一样,拿来即用。
3、流程清晰,在发送消息的地方指明消费者代码位置,同时通过消费者代码亦可找到有多少消息发送者。
4、异常处理增强,当消费者报错时候,通过优雅的方式处理异常,而不是rabbitmq反复重试,导致满屏错误日志。
二、代码封装后总共有4个.java文件,分别是:
MqConf.java——>用于配置RabbitMq,定义公共交换机、队列、统一消费者等。
MqListener
MqTool.java——>定义对外调用方法,实际使用时直接调用该类方法发送消息。
MqMsg
MqConf.java
import com.bohuikeji.frame.module.admin.base.MqListener;
import com.bohuikeji.frame.module.admin.entity.mq.MqMsg;
import com.bohuikeji.frame.module.admin.thread.CachedThreadPool;
import com.bohuikeji.frame.module.admin.thread.MqListenerThread;
import com.bohuikeji.frame.module.admin.utils.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration("ranch-rabbitmq-conf")
public class MqConf {
public static final String QUEUE_RANCH_ONE_TO_ONE="queue.ranch.one.to.one";
public static final String QUEUE_RANCH_ONE_TO_ALL="queue.ranch.one.to.all."+System.currentTimeMillis();
public static final String QUEUE_RANCH_DELAY="queue.ranch.delay";
public static final String EXCHANGE_FANOUT_RANCH ="exchange.fanout.ranch";
public static final String EXCHANGE_DELAY_RANCH="exchange.delay.ranch";
@Bean
public Queue oneToOneQueue() {return new Queue(QUEUE_RANCH_ONE_TO_ONE, true,false,true);}
@Bean
public Queue oneToAllQueue(){return new Queue(QUEUE_RANCH_ONE_TO_ALL, true,false,true);}
@Bean
public Queue delayQueue(){return new Queue(QUEUE_RANCH_DELAY, true,false,true);}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(EXCHANGE_FANOUT_RANCH);
}
@Bean
public CustomExchange delayExchange(){
Map args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_DELAY_RANCH,"x-delayed-message",true, false,args);
}
@Bean
public Binding fanoutBuild(Queue oneToAllQueue, FanoutExchange fanoutExchange){
return BindingBuilder.bind(oneToAllQueue).to(fanoutExchange);
}
@Bean
public Binding delayBuild(Queue delayQueue, CustomExchange delayExchange){
return BindingBuilder.bind(delayQueue).to(delayExchange).with(QUEUE_RANCH_DELAY).noargs();
}
@RabbitHandler
@RabbitListener(queues = {QUEUE_RANCH_ONE_TO_ONE,QUEUE_RANCH_DELAY,"#{oneToAllQueue.name}"},priority = "${spring.rabbitmq.priority}")
private void news(MqMsg> msg) {
MqListener listener= SpringUtil.getBean(msg.getListener());
try {
CachedThreadPool.execute(new MqListenerThread(listener,msg.getData()));
} catch (Exception e) {
log.error("【消息队列监听者业务异常】错误信息:{},位于:{}",e.getMessage(),listener.getClass(),e);
listener.onError(msg.getData(),e);
}
}
}
MqListener.java
public abstract class MqListener{ public abstract boolean onReceive(T msg); public abstract void onError(T msg,Exception e); }
MqTool.java
import com.bohuikeji.frame.module.admin.base.MqListener;
import com.bohuikeji.frame.module.admin.config.MqConf;
import com.bohuikeji.frame.module.admin.entity.mq.MqMsg;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
@Component
@RequiredArgsConstructor
public class MqTool {
final RabbitTemplate rabbitTemplate;
public void sendMsg(T msg,Class extends MqListener> listenerClass){
rabbitTemplate.convertAndSend(MqConf.QUEUE_RANCH_ONE_TO_ONE,new MqMsg(msg,listenerClass));
}
public void sendMsgToAll(T msg,Class extends MqListener> listenerClass){
rabbitTemplate.convertAndSend(MqConf.QUEUE_RANCH_ONE_TO_ALL,new MqMsg(msg,listenerClass));
}
public void sendDelayMsg(T msg, Class extends MqListener> listenerClass, Duration duration){
rabbitTemplate.convertAndSend(MqConf.EXCHANGE_DELAY_RANCH, MqConf.QUEUE_RANCH_DELAY, new MqMsg(msg,listenerClass), message ->{
message.getMessageProperties().setHeader("x-delay",duration.toMillis());
return message;
});
}
}
MqMsg.java
import com.bohuikeji.frame.module.admin.base.MqListener; import lombok.Data; import java.io.Serializable; @Data public class MqMsgimplements Serializable { T data; Class> listener; public MqMsg(T msg, Class extends MqListener > listener) { this.data=msg; this.listener=listener; } }
在MqConf中统一监听者收到消息后,是用线程池给每个消息分配一个线程去处理的,这里把线程池的代码也贴出来,还用到一个springUtils的工具,用于获取每个消息对应消费者的bean,以下一同贴出代码。
线程池定义代码(注意这里使用了HuTool工具类):
import cn.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.*;
import static java.lang.Integer.MAX_VALUE;
public class CachedThreadPool {
private static class ThreadPoolHolder {
//定义一个线程池,静态内部类单例方式
private static final ExecutorService EXEC;
static {
EXEC = new ThreadPoolExecutor(4, MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue(),new NamedThreadFactory("ranchThread",false));
}
}
public static void execute(Runnable command) throws Exception {
ThreadPoolHolder.EXEC.submit(command);
}
消息处理线程定义代码:
import com.bohuikeji.frame.module.admin.base.MqListener;
public class MqListenerThread implements Runnable{
private MqListener listener;
private Object msg;
public MqListenerThread(MqListener listener, Object msg){
this.listener=listener;
this.msg=msg;
}
@Override
public void run() {
listener.onReceive(msg);
}
}
SpringUtil工具类的代码:
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class SpringUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringUtil.applicationContext == null) {
SpringUtil.applicationContext = applicationContext;
}
}
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
public static Object getBean(String name) {
return getApplicationContext().getBean(name);
}
public static T getBean(Class clazz) {
return getApplicationContext().getBean(clazz);
}
public static T getBean(String name, Class clazz) {
return getApplicationContext().getBean(name, clazz);
}
public static String getActiveProfile() {
return applicationContext.getEnvironment().getActiveProfiles()[0];
}
public static String getProperty(String propertyName) {
return applicationContext.getEnvironment().getProperty(propertyName);
}
三、使用
使用仅需2步
1、定义编写消费者代码,也就是继承MqListener
2、在需要发消息的调用Mqtool类的方法,发送消息。
示例如下:
这是消费者业务定义代码(可以理解为消息监听者):
import com.bohuikeji.frame.module.admin.base.MqListener; import org.springframework.stereotype.Service; @Service public class TestListener extends MqListener{ @Override public boolean onReceive(String msg) { System.out.println("TestListener接收到消息的:"+msg); return true; } @Override public void onError(String msg, Exception e) { System.out.println("业务代码出现异常,可在此处进行处理!"); } }
这是发送消息的代码,示例发送了三种类型消息:
@Autowired MqTool mqTool;
@GetMapping("/test")
public void test(){
//定义字符串消息
String msg="这是一条字符串消息";
//发送普通消息:集群环境下,仅有一个消费者会收到消息
mqTool.sendMsg(msg,TestListener.class);
//发送广播消息:集权环境下,所有的集群节点(消费者)都会收到消息
mqTool.sendMsgToAll(msg, TestListener.class);
//发送延迟消息:延时20秒后,仅有一个消费者会收到消息
mqTool.sendDelayMsg(msg,TestListener.class, Duration.ofSeconds(20));
}
补充:如需发送其他类型消息,如Ineger,实体包装类等,则定义消费者代码时候指定泛型
@Service public class TestListener extends MqListener四、结束{ @Override public boolean onReceive(SysUser user) { System.out.println("TestListener接收到消息的:"+user); return true; } @Override public void onError(SysUser user, Exception e) { System.out.println("业务代码出现异常,可在此处进行处理!"); } }
封装后的代码已经在笔者的项目中投入生产使用了,后续可能会再消息被动重试、主动重试、以及异常处理方面再做加强,如果您有好的意见或想法,欢迎留言评论。



