RabbitMQ
channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual hos:虚拟主机,是对queue、exchange等资源的逻辑分组
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列(生产者与消费者都需要声明队列,是防止队列不存在,重复声明不会影响)
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定(回调机制,绑定函数但要收到消息才执行)
安装MQ
docker pull rabbitmq:3-management
docker run -e RABBITMQ_DEFAULT_USER=yymq -e RABBITMQ_DEFAULT_PASS=123456@ --name mq --hostname mq1 -p 15672:15672 -p 5672:5672 -d rabbitmq:3-management
15672是管理网页端口
5672是消息队列通信端口
访问宿主机ip+15672端口
创建一个队列simple.queue
AMQP
应用间消息通信的一种协议,与语言和平台无关
SpringAMQP
使用RabbitMQ本身需要绑写的内容较多,Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两个部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
利用SpringAMQP实现Hello World中的基础消息队列功能
- 在父工程中引入Spring-amqp的依赖
- 在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
- 在consumer服务中编写消费逻辑,绑定simple.queue这个队列
父工程添加依赖
org.springframework.boot spring-boot-starter-amqp
创建publisher模块
在publisher中编写测试方法,向simple.queue发送消息
- application.yml中添加mq连接信息
spring:
rabbitmq:
host: 192.168.0.106 # rabbitM的IP
port: 5672
username: yymq
password: 123456@
virtual-host: /
- 在publisher服务中新建一个测试类,编写测试方法:
注意:如果报junit启动报错 Unable to find a @SpringBootConfiguration
需要把测试对应包与实际包名称一致,且添加一个启动类后再进行测试类运行。
package com.yy.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue(){
String queueName="simple.queue";
String message="hello,string amqp!";
rabbitTemplate.convertAndSend(queueName,message);
}
}
运行测试后,可以访问MQ网页查看推送的消息
在consumer模块中消费消息
- application.yml中添加mq连接信息
spring:
rabbitmq:
host: 192.168.0.106 # rabbitM的IP
port: 5672
username: yymq
password: 123456@
virtual-host: /
- 编写Bean
package com.yy.comsumer.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
//声明为bean,让Spring能够找到
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
//告诉spring要监听了队列
public void listenerSimpleQueueMessage(String msg) throws InterruptedException{
//String 是列表发的什么类型,此处就对应定义为什么消息
System.out.println("Spring 消费者接收到消:【" + msg +"】");
}
}
编写启动类测试
package com.yy.comsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class);
}
}
启动后查看日志
同时MQ中消息消费后就没有了



