视频地址:https://www.bilibili.com/video/BV1mr4y1J77n
之前面试的时候都会被问到为什么使用MQ,使用MQ的好处是什么,我都会照本宣科的说:异步、解耦、削峰,这几个词也好理解,都是字面意思,今天我们就来进一步加深理解异步和结解耦。
一、引入问题
先思考这样一个问题,在多个系统之间我们想要异步的调用怎么做呢?当然MQ就是一个很好的解决办法
- 如何去用呢?在A系统引入MQ,作为生产者,在B系统也引入MQ做消费者,当然可以实现功能,但会不会很麻烦?每个系统都要引入一套重复的东西。
- 大多数我们业务场景的并发量其实很小,如果我们对每个业务场景都弄一个自己的queue 是不是很浪费?管理起来也很麻烦。
- 如果有一个场景当我们做了某个操作之后,我们要通知A、B、C…系统来做相对应的处理,又该如何去做呢?(系统只会越来越多)
对于上面的问题,我们可以给出一个解决方案,那就是我们可以定义一个平台MQ,做成一个starter,谁要用我们就引入这个pom,每个项目都有自己的spring.application我们以这个为队列名称,注册到我们MQ里面,做一个广播消息,每一个服务既可以做生产者,也可以做消费者。
其实这里会有一个问题,比如我A服务发送一个消息了,B、C、D…服务都接受到了这个消息,但实际上只有B服务是需要消费这个消息的。很多人可能和我最开始的思路一样我在消息体里面加一个type,根据这个type来去判断谁消费。
// 接受到了消息,拿到了type
if(type == 1) {
// ...
}else if(type == 2) {
// ...
}
....
上面的代码当然可以解决我们的问题,但是想想每次新增一个事件都得去修改原本的接受逻辑,太low了。Spring框架里面已经做了这个操作ApplicationEventPublisher 通过这个类,我们就可以做到请求分发,根据class类型来。(具体后面讲解)
二、流程图
基于上面的理解,我画出了基础的流程图
2-1、系统交互流程图
每个服务都引入基础的mq-starter底包
每一个服务都可以作生产者,但每一个服务都是消费者。
2-2、具体服务内部流转图
三、代码实现
3-1、代码 (基于RabbitMQ实现) AutoConfigurationPlatformMq
这就是自动注入的核心代码了
import com.xdx97.mq.consumer.PlatformConsumer;
import com.xdx97.mq.provider.PlatformMqProvider;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@Configuration
public class AutoConfigurationPlatformMq {
public static final String PLATFORM_EXCHANGE = "platform_exchange";
private final String projectQueue;
private final String projectRouteKey;
public AutoConfigurationPlatformMq(Environment environment) {
this.projectQueue = environment.getRequiredProperty("spring.application.name") + ".platform";
this.projectRouteKey = "spring.application.name." + environment.getRequiredProperty("spring.application.name");
}
@Bean
public Queue platformQueue() {
return QueueBuilder
.durable(projectQueue)
.build();
}
@Bean
public Exchange platformExchange() {
return ExchangeBuilder
.fanoutExchange(PLATFORM_EXCHANGE)
.durable(true)
.build();
}
@Bean
public Binding platformBinding() {
return BindingBuilder
.bind(platformQueue())
.to(platformExchange())
.with("*")
.and(null);
}
@Bean
public PlatformMqProvider platformMqProvider(){
return new PlatformMqProvider();
}
@Bean
public PlatformConsumer platformConsumer(){
return new PlatformConsumer();
}
}
PlatformMqProvider
import com.xdx97.mq.AutoConfigurationPlatformMq;
import com.xdx97.mq.event.PlatformEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import javax.annotation.Resource;
import java.util.function.Consumer;
@Slf4j
public class PlatformMqProvider {
@Value("${spring.application.name}")
private String source;
@Resource
private AmqpTemplate rabbitMqTemplate;
public void sendPlatformMessage(PlatformEvent platformEvent) {
platformEvent.setSource(source);
rabbitMqTemplate.convertAndSend(AutoConfigurationPlatformMq.PLATFORM_EXCHANGE,"*", platformEvent);
}
public void sendOtherMessage(Consumer consumer) {
consumer.accept(this.rabbitMqTemplate);
}
}
PlatformConsumer
import com.xdx97.mq.event.PlatformEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.messaging.handler.annotation.Payload;
@Slf4j
public class PlatformConsumer implements ApplicationEventPublisherAware {
@Autowired
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.publisher = applicationEventPublisher;
}
@RabbitHandler
@RabbitListener(queues = "${spring.application.name}.platform")
public void handler(@Payload PlatformEvent message) {
log.info("接受到平台事件消息:{}",message.toString());
publisher.publishEvent(message);
}
}
PlatformEvent
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.UUID;
@Data
public class PlatformEvent implements Serializable {
private static final long serialVersionUID=1L;
private String source;
private String transactionNo = UUID.randomUUID().toString();
private LocalDateTime eventTimeStamp = LocalDateTime.now();
}
pom
4.0.0 com.xdx97.mq xdx-mq-starter 1.0.0-SNAPSHOT org.springframework.boot spring-boot-dependencies 2.2.1.RELEASE pom import yyyyMMddHHmmss 8 8 org.springframework.boot spring-boot-starter-web runtime org.projectlombok lombok org.springframework.boot spring-boot-starter-amqp
3-2、使用 导入pom依赖
com.xdx97.mq xdx-mq-starter 1.0.0-SNAPSHOT
定义一个事件
其实就是一个继承PlatformEvent的实体类 (注:这个是要放在底包里面的)
@Data
public class TestEvent extends PlatformEvent{
private String name;
}
发送消息
任意服务
@Resource
private PlatformMqProvider platformMqProvider;
public void fun() {
TestEvent testEvent = new TestEvent();
testEvent.setName("小道仙");
platformMqProvider.sendPlatformMessage(testEvent);
}
消费消息
任意服务
@EventListener(TestEvent.class)
public void testListener(TestEvent testEvent){
// 业务处理
}
四、衍生问题 4-1、是否需要配置化注入
Spring里面提供一类以 @ConditionalOn 开头的注解,可以理解成在一定条件下进行注入
-
@ConditionalOnBean 当容器中存在某个bean才进行注入
-
@ConditionalOnProperty 当配置文件满足什么条件才进行注入
-
…
所以我在设计之初考虑如果我可以用这种方式去控制何时注入queue、何时注入exchange、何时注入生产者…
这看似提高了灵活度,但是仔细思考一下,别人如果引入你的包,不去把队列绑定到平台事件上,那就相当于无法发送消息和消费消息,那引入这个包的意义何在?
4-2、Spring组件扫描
最开始我的消费者和生产者都是如下这样去定义的,每个类上面都加了两个注解@Slf4j @Component
@Slf4j @Component public class PlatformConsumer implements ApplicationEventPublisherAware
@Slf4j 是日志注解自不必说,@Component 是注入bean的,但有一个前提是你的项目可以扫描到这个包
我们项目包名都是以公司的域名来命名的,而且扫描的范围一般都很大,基于这两个前提下我这个mq-starter是没有问题的,可以发送可以接受。
但如果使用者的项目包名不是以你这个命名的,那就完蛋。
这里理解一下自动注入,也就是引入了你的pom文件后,底包里面的bean应该是要自动注入的,上面这种做法不是自动注入,而是使用者的项目去扫描到而注入的。
改造后直接把bean注入放在AutoConfigurationPlatformMq 里面就好了
4-3、如何定义事件类型公共属性
所谓事件类型公共属性就是PlatformEvent类了,这个其实和功能实现无关,在我最开始实现了接受消息和发送消息后,我就以为我这个mq-starter已经完成了,当时这个类里面只有一个transactionNo 唯一标示id
这也是demo和生产最大的区别,如果我自己搭建demo,至此已经完美结束了
但在生产不行,平台事件是每个服务都可以发送的,只一个id无法知道具体的来源,后面在组长的帮助下加了系统来源和时间
这一点也很重要,一个完整的工程,不仅仅是代码功能的实现,还有业务的考量,还有代码的优美,比如你都命名a、b、c 这合理吗?
4-4、事件类型如何处理
所谓的事件类型就是一个个消息class,只要是继承了PlatformEvent的类都算
原本我是想在生产者端定义一个A.class 去继承PlatformEvent,在消费者端也同样来一个A.class 去继承PlatformEvent
想一想,这样的两个A.class是一样的吗?
最终解决办法是把这A.class放在底包里面就好了,这样消费者和生产者引入的都是一个A.class了,每次新增只需要重新 mvn deploy 一下即可。需要用到的服务去更新一个maven之前旧的服务不更新也不会出错
五、源码获取
关注微信公众号回复关键字获取
- 公众号:小道仙97
- 关键字:xdx-mq-starter



