栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Springboot下RabbitMQ 封装实现,支持点对点消息,广播消息、延时消息

Springboot下RabbitMQ 封装实现,支持点对点消息,广播消息、延时消息

一、为什么要封装

        在软件项目开发中,基本上都是多人共同开发,尤其是大型项目。而消息队列更是大多数业务都会使用,前期基本上都是在业务中直接注入RabbitTemplate,创建各自的交换机、队列、监听者完成业务开发,但是到了项目后期,大量的交换机和队列的定义代码充斥在各个模块,每次需要回溯业务逻辑的时候都在不停的翻找代码,寻找监听者。

致命的是,通常我们找到了发消息的代码,由于通过RabbitTemplate发送消息是直接发给交换机的,想要找到处理消息的消费者业务代码,还需要通过交换机定义代码——交换机队列绑定——队列——监听者,这样繁琐的操作。同样的当你找了消息消费者的代码却不知道都有项目哪块对该队列发送了消息,如此总总浪费了开发人员大量的时间。

于是笔者为了消灭这种情况决定对RabbitMQ操作进行封装,目的是实现以下功能:

1、代码统一,交换机和队列复用,消灭项目定义交换机及队列的代码。

2、使用简单,像使用工具类一样,拿来即用。

3、流程清晰,在发送消息的地方指明消费者代码位置,同时通过消费者代码亦可找到有多少消息发送者。

4、异常处理增强,当消费者报错时候,通过优雅的方式处理异常,而不是rabbitmq反复重试,导致满屏错误日志。

二、代码

封装后总共有4个.java文件,分别是:

MqConf.java——>用于配置RabbitMq,定义公共交换机、队列、统一消费者等。

MqListener.java——>抽象类,使用时定义消息消费者业务代码需要继承此类实现onReceive和onError方法。

MqTool.java——>定义对外调用方法,实际使用时直接调用该类方法发送消息。

MqMsg.java——>实际发送的消息包装体。

MqConf.java

import com.bohuikeji.frame.module.admin.base.MqListener;
import com.bohuikeji.frame.module.admin.entity.mq.MqMsg;
import com.bohuikeji.frame.module.admin.thread.CachedThreadPool;
import com.bohuikeji.frame.module.admin.thread.MqListenerThread;
import com.bohuikeji.frame.module.admin.utils.SpringUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;


@Slf4j
@Configuration("ranch-rabbitmq-conf")
public class MqConf {

    
    public static final String QUEUE_RANCH_ONE_TO_ONE="queue.ranch.one.to.one";
    
    public static final String QUEUE_RANCH_ONE_TO_ALL="queue.ranch.one.to.all."+System.currentTimeMillis();

    
    public static final String QUEUE_RANCH_DELAY="queue.ranch.delay";

    
    public static final String EXCHANGE_FANOUT_RANCH ="exchange.fanout.ranch";

    
    public static final String EXCHANGE_DELAY_RANCH="exchange.delay.ranch";


    @Bean
    public Queue oneToOneQueue() {return new Queue(QUEUE_RANCH_ONE_TO_ONE, true,false,true);}

    @Bean
    public Queue oneToAllQueue(){return new Queue(QUEUE_RANCH_ONE_TO_ALL, true,false,true);}

    @Bean
    public Queue delayQueue(){return new Queue(QUEUE_RANCH_DELAY, true,false,true);}

    
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(EXCHANGE_FANOUT_RANCH);
    }

    
    @Bean
    public CustomExchange delayExchange(){
        Map args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_DELAY_RANCH,"x-delayed-message",true, false,args);
    }

    
    @Bean
    public Binding fanoutBuild(Queue oneToAllQueue, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(oneToAllQueue).to(fanoutExchange);
    }

    
    @Bean
    public Binding delayBuild(Queue delayQueue, CustomExchange delayExchange){
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(QUEUE_RANCH_DELAY).noargs();
    }


    
    @RabbitHandler
    @RabbitListener(queues = {QUEUE_RANCH_ONE_TO_ONE,QUEUE_RANCH_DELAY,"#{oneToAllQueue.name}"},priority = "${spring.rabbitmq.priority}")
    private void news(MqMsg msg) {
        MqListener listener= SpringUtil.getBean(msg.getListener());
        try {
            CachedThreadPool.execute(new MqListenerThread(listener,msg.getData()));
        } catch (Exception e) {
            log.error("【消息队列监听者业务异常】错误信息:{},位于:{}",e.getMessage(),listener.getClass(),e);
            listener.onError(msg.getData(),e);
        }
    }
}

MqListener.java 

public abstract class MqListener {

    
    public abstract boolean onReceive(T msg);

    
    public abstract void onError(T msg,Exception e);

}

 MqTool.java

import com.bohuikeji.frame.module.admin.base.MqListener;
import com.bohuikeji.frame.module.admin.config.MqConf;
import com.bohuikeji.frame.module.admin.entity.mq.MqMsg;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;


@Component
@RequiredArgsConstructor
public class MqTool {

    final RabbitTemplate rabbitTemplate;

    
    public  void sendMsg(T msg,Class> listenerClass){
        rabbitTemplate.convertAndSend(MqConf.QUEUE_RANCH_ONE_TO_ONE,new MqMsg(msg,listenerClass));
    }

    
    public  void sendMsgToAll(T msg,Class> listenerClass){
        rabbitTemplate.convertAndSend(MqConf.QUEUE_RANCH_ONE_TO_ALL,new MqMsg(msg,listenerClass));
    }

    
    public  void sendDelayMsg(T msg, Class> listenerClass, Duration duration){
        rabbitTemplate.convertAndSend(MqConf.EXCHANGE_DELAY_RANCH, MqConf.QUEUE_RANCH_DELAY, new MqMsg(msg,listenerClass), message ->{
            message.getMessageProperties().setHeader("x-delay",duration.toMillis());
            return message;
        });
    }
}

 MqMsg.java

import com.bohuikeji.frame.module.admin.base.MqListener;
import lombok.Data;
import java.io.Serializable;


@Data
public class MqMsg implements Serializable {

    
    T data;
    
    Class> listener;

    
    public MqMsg(T msg, Class> listener) {
        this.data=msg;
        this.listener=listener;
    }
}

 在MqConf中统一监听者收到消息后,是用线程池给每个消息分配一个线程去处理的,这里把线程池的代码也贴出来,还用到一个springUtils的工具,用于获取每个消息对应消费者的bean,以下一同贴出代码。

线程池定义代码(注意这里使用了HuTool工具类):

import cn.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.*;
import static java.lang.Integer.MAX_VALUE;


public class CachedThreadPool {

    
    private static class ThreadPoolHolder {
        //定义一个线程池,静态内部类单例方式
        private static final ExecutorService EXEC;

        static {
            
            EXEC = new ThreadPoolExecutor(4, MAX_VALUE,
                    60L, TimeUnit.SECONDS,
                    new SynchronousQueue(),new NamedThreadFactory("ranchThread",false));
        }
    }

    
    public static void execute(Runnable command) throws Exception {
        ThreadPoolHolder.EXEC.submit(command);
    }

 消息处理线程定义代码:

import com.bohuikeji.frame.module.admin.base.MqListener;


public class MqListenerThread implements Runnable{

    private MqListener listener;
    private Object msg;

    public MqListenerThread(MqListener listener, Object msg){
        this.listener=listener;
        this.msg=msg;
    }

    @Override
    public void run() {
        listener.onReceive(msg);
    }
}

SpringUtil工具类的代码:

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;


@Component
public class SpringUtil implements ApplicationContextAware {
    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (SpringUtil.applicationContext == null) {
            SpringUtil.applicationContext = applicationContext;
        }
    }

    
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    
    public static Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    
    public static  T getBean(Class clazz) {
        return getApplicationContext().getBean(clazz);
    }

    
    public static  T getBean(String name, Class clazz) {
        return getApplicationContext().getBean(name, clazz);
    }

    
    public static String getActiveProfile() {
        return applicationContext.getEnvironment().getActiveProfiles()[0];
    }


    
    public static String getProperty(String propertyName) {
        return applicationContext.getEnvironment().getProperty(propertyName);
    }
三、使用

使用仅需2步

1、定义编写消费者代码,也就是继承MqListener类,并实现。

2、在需要发消息的调用Mqtool类的方法,发送消息。

示例如下:

这是消费者业务定义代码(可以理解为消息监听者):

import com.bohuikeji.frame.module.admin.base.MqListener;
import org.springframework.stereotype.Service;


@Service
public class TestListener extends MqListener {
    @Override
    public boolean onReceive(String msg) {
        System.out.println("TestListener接收到消息的:"+msg);
        return true;
    }

    @Override
    public void onError(String msg, Exception e) {
        System.out.println("业务代码出现异常,可在此处进行处理!");

    }
}

这是发送消息的代码,示例发送了三种类型消息:

@Autowired MqTool mqTool;

    
    @GetMapping("/test")
    public void test(){
        //定义字符串消息
        String msg="这是一条字符串消息";

        //发送普通消息:集群环境下,仅有一个消费者会收到消息
        mqTool.sendMsg(msg,TestListener.class);

        //发送广播消息:集权环境下,所有的集群节点(消费者)都会收到消息
        mqTool.sendMsgToAll(msg, TestListener.class);

        //发送延迟消息:延时20秒后,仅有一个消费者会收到消息
        mqTool.sendDelayMsg(msg,TestListener.class, Duration.ofSeconds(20));
    }

补充:如需发送其他类型消息,如Ineger,实体包装类等,则定义消费者代码时候指定泛型的类型即可,示例如下:

@Service
public class TestListener extends MqListener {
    @Override
    public boolean onReceive(SysUser user) {
        System.out.println("TestListener接收到消息的:"+user);
        return true;
    }

    @Override
    public void onError(SysUser user, Exception e) {
        System.out.println("业务代码出现异常,可在此处进行处理!");

    }
}
四、结束

封装后的代码已经在笔者的项目中投入生产使用了,后续可能会再消息被动重试、主动重试、以及异常处理方面再做加强,如果您有好的意见或想法,欢迎留言评论。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/741863.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号