- 一.RabbitMQ安装
- 二.SpringAMQP
- 1.Basic Queue 简单队列模型
- 2.Work Queue 工作队列模型
- 3.发布订阅模型
- 3.1 FanoutExchange(广播交换机)
- 3.2 DirectExchange(路由交换机)
- 3.3 TopicExchange(话题交换机)
①拉取RabbitMQ镜像 docker pull rabbitmq
②运行安装命令
15672是图形化界面的端口号
docker run
-e RABBITMQ_DEFAULT_USER=itcast
-e RABBITMQ_DEFAULT_PASS=123321
–name mq
–hostname mq1
-p 15672:15672
-p 5672:5672
-d
rabbitmq
③安装图形化界面的插件
- docker exec -it mq容器的id /bin/bash
例如:docker exec -it ddfbc46c13e7 /bin/bash - rabbitmq-plugins enable rabbitmq_management
④登录可视化界面 IP:15672
输入账户名itcast 密码123321
①创建一个普通的maven项目,作为父工程,父项目引入相关依赖。
4.0.0 org.example RabbitMQ pom 1.0-SNAPSHOT publisher consumer org.springframework.boot spring-boot-starter-parent 2.4.12 8 8 org.springframework.boot spring-boot-starter-amqp 2.4.12 org.springframework.boot spring-boot-starter-test com.fasterxml.jackson.core jackson-databind
②创建微服务消息发布者publisher
- 编写启动类,在application.yml进行配置
spring:
rabbitmq:
host: 192.168.10.128
port: 5672
virtual-host: /
username: itcast
password: 123321
- 编写测试类SpringAmqpTest
package com.gzhu;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
在rabbitmq的可视化界面手动创建一个消息队列
- 启动测试类,可以看到消息已进入队列
③消费者consumer - 编写启动类,在application.yml配置
spring:
rabbitmq:
host: 192.168.10.128
port: 5672
virtual-host: /
username: itcast
password: 123321
- 编写消费者
package com.gzhu.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息 :【" + msg + "】"); }
}
- 启动,可以发现控制台接收到了消息,而且simple.queue队列中的消息也没了
- 测试类,模拟消息发布者在1秒内发送了50条消息
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "work.queue";
String message = "hello, I am _";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName, message+ i);
Thread.sleep(20);
}
}
- 设置两个消费者,两者处理能力不同
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者1接收到消息 :【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("spring 消费者2接收到消息 :【" + msg + "】"+ LocalTime.now());
Thread.sleep(200);
}
在application.yml中配置如下信息,因为两者的处理能力不同,所以设置消费者预取消息数量为1,只有处理完当前的消息,才能获取下一条消息,否则AMQP默认情况两个消费者会平均读取消息才处理
spring:
rabbitmq:
host: 192.168.10.128
port: 5672
virtual-host: /
username: itcast
password: 123321
listener:
simple:
prefetch: 1 # 只能预先取得1条消息,处理完成才可以接下一条
- 先启动消费者,再启动测试类,消息在1秒左右的时间被处理完成了
交换机的作用
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
声明队列、交换机、绑定关系的Bean
- Queue
- FanoutExchange
- Binding
FanoutExchange的会将消息路由到每个绑定的队列
1.在consumer服务中,编写一个配置类,利用代码声明队列、交换机,并将两者绑定
package com.gzhu.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
//声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("gzhu.fanout");
}
//队列1
@Bean
public Queue fanouQueue1(){
return new Queue("fanout.queue1");
}
//绑定关系1
@Bean
public Binding fanoutBinding1(Queue fanouQueue1, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanouQueue1)
.to(fanoutExchange);
}
//队列2
@Bean
public Queue fanouQueue2(){
return new Queue("fanout.queue2");
}
//绑定关系2
@Bean
public Binding fanoutBinding2(Queue fanouQueue2, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanouQueue2)
.to(fanoutExchange);
}
}
-去控制台查看交换机信息,但在这遇到了500的错误码,解决方法:
链接: https://blog.csdn.net/shentian885/article/details/120905570?spm=1001.2101.3001.6661.1&utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7ECTRLIST%7Edefault-1.no_search_link
解决问题后,可以看到交换机和队列绑定在了一起
2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues = "fanout.queue1")
public void listenWorkQueueMessage(String msg) throws InterruptedException {
System.out.println("fanout.queue1接收到消息 :【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
System.err.println("fanout.queue2接收到消息 :【" + msg + "】");
}
3.在publisher中编写测试方法,向gzhu.fanout交换机发送消息
@Test
public void testSendFanoutExchange(){
//交换机名称写死
String exchangeName = "gzhu.fanout";
//消息
String message = "hello, baby!!";
//发送消息
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
4.启动消费者,启动测试类,可以看到每一个消费者都接收到了消息
3.2 DirectExchange(路由交换机)Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)
1.使用注解的方式在consumer消费者服务中,将消费者与队列、路由机绑定
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "gzhu.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到Direct消息:【"+msg+"】");}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "gzhu.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}))public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到Direct消息:【"+msg+"】 ");
}
2.消息发布者测试类发布消息
@Test
public void testDirectExchange() {
// 队列名称
String exchangeName = "gzhu.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息,参数依次为:交换机名称,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
3.启动消费者,再启动发布者测试类
Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
1.编写消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "gzhu.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者1接收到Topic消息:【"+msg+"】");}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "gzhu.topic", type = ExchangeTypes.TOPIC),
key = "japan.#"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者2接收到Topic消息:【"+msg+"】");}
2.发布者测试类
@Test
public void testTopicExchange() {
// 队列名称
String exchangeName = "gzhu.topic";
// 消息
String message = "祖国统一,收复台湾 !";
// 发送消息,参数依次为:交换机名称,RoutingKey,消息
rabbitTemplate.convertAndSend(exchangeName, "china.song", message);
}
3.启动消费者,再运行测试类



