1.初始MQ
1.1同步通讯和异步通讯1.2 同步调用的问题1.3异步调用1.4什么是MQ? 2.MQ安装
2.1RabbitMQ概述和安装2.2常见消息类型 3.SpringAMQP
3.1什么是SpringAMQP3.2利用SpringAMQP实现HelloWorld中的基础消息队列功能3.3SpringAMQP-WorkQueue模型3.4SpringAMQP-发布订阅模型3.5SpringAMQP-发布订阅模型-Fanout Exchange3.6SpringAMQP-发布订阅模型-Direct Exchange3.7SpringAMQP-发布订阅模型-Topic Exchange3.8SpringAMQP-消息转换器
1.初始MQ 1.1同步通讯和异步通讯 1.2 同步调用的问题微服务间基于Feign的调用就属于同步方式,存在一些问题
异步调用常见实现就是事件驱动模式
MQ(Message Queue):中文消息队列,字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官方地址https://www.rabbitmq.com/
RabbitMQ的结构和概念
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
流程如下:
1.在父工程中引入spring-amqp的依赖
org.springframework.boot spring-boot-starter-amqp
2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
在publisher服务中编写application.yml,添加mq连接信息:
spring: rabbitmq: host: 192.168.150.101 port: 5672 username: itcast password: 123321 virtual-host: /在publisher服务中新建一个测试类,编写测试方法
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName="simple.queue";
String message ="hello,spring amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
}
3.在consumer服务中编写消费逻辑,绑定simple.queue这个队列
在consumer服务中编写application.yml,添加mq连接信息:
spring:
rabbitmq:
host: 192.168.150.101
port: 5672
username: itcast
password: 123321
virtual-host: /
在consumer服务中新建一个类,编写消费逻辑:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException{
System.out.println("消费者接收到simple.queue的消息:【"+msg+"】");
}
}
3.3SpringAMQP-WorkQueue模型
WorkQueue 工作队列,可以提高消息处理速度,避免队列消息堆积
模拟WorkQueue,实现一个队列绑定多个消费者
基本思路如下:
在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName="simple.queue";
String message ="hello,spring amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName="simple.queue";
String message ="hello,spring amqp";
for (int i = 1; i <50 ; i++) {
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}
}
在consumer服务中定义两个消息监听者,都监听simple.queue队列
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException{
System.out.println("消费者1接收到消息:【"+msg+"】"+ LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException{
System.out.println("消费者2........接收到消息:【"+msg+"】"+LocalTime.now());
Thread.sleep(200);
}
}
消费者1每秒处理50条消息,消费者2每秒处理10条消息
发布订阅模式与之前案例的却别就是允许同一消息发送给多个消费者。实现方式是加入了exchange(交换机),常见exchange类型包含:
Fanout:广播Direct :路由Topic :话题
3.5SpringAMQP-发布订阅模型-Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)
Topic Exchange与Direct Exchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割



