使用一个订单案例来演示消息提供方:一个用户下一个订单,订单信息发送至fanout模式的交换机,这个交换机会绑定几个队列,然后会建立几个消费者分别为短信服务、邮件服务、SMS服务。用户一下单,就会通过短信+邮件+SMS 获得订单号的提示
首先创建一个springboot模块,勾选的模块有两个
如果网络不好,导致spring Initializr 创建springboot项目失败的话,就创建一个普通的maven工程,导入的坐标为
org.springframework.boot spring-boot-starter-parent 2.4.4 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test test org.springframework.amqp spring-rabbit-test test
然后编写配置文件
# 端口
server:
port: 8080
# RabbitMQ 服务配置
spring:
rabbitmq:
username: 用户名
password: 密码
host: 服务器ip
port: 5672
virtual-host: /
创建一个service
package com.hs.rabbitmq.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void makeOrder(String userid , String productid , int number){
// 1. 查看商品库存是否充足
// 2. 保存该订单
String orderId = UUID.randomUUID().toString();
// 3. 通过MQ来进行消息的分发 也就是将消息写入交换机
String exchangeName = "fanout_order_exchange";
String routingKey = "";
// 该方法是三个参数为 交换机名 + 路由key/队列名 + 要发送的消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}
}
这里只是把消息写入到某个交换机中,但是交换机的创建、队列的创建、交换机绑定队列还没有完成。也可以web界面完成 ,接下来是代码的方式实现。这里需要创建一个config
package com.hs.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqConfiguration {
@Bean
public FanoutExchange fanoutExchange(){
// 在创建对象的时候指定交换机名
return new FanoutExchange("fanout_order_exchange");
}
@Bean
public Queue queueNote(){
// 队列名 + 是否持久化
return new Queue("order.queue.note",true);
}
@Bean
public Queue queueEmail(){
return new Queue("order.queue.email",true);
}
@Bean
public Queue queueSMS(){
return new Queue("order.queue.sms",true);
}
@Bean
public Binding smsBinding(){
// bind(队列对象).to(交换机对象) 这里的两个方法参数中的方法就是我们上面自定义的方法
return BindingBuilder.bind(queueSMS()).to(fanoutExchange());
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(queueEmail()).to(fanoutExchange());
}
@Bean
public Binding noteBinding(){
return BindingBuilder.bind(queueNote()).to(fanoutExchange());
}
}
创建一个主启动类
@SpringBootApplication
public class SpringbootOrderRabbitmqProducer {
public static void main(String[] args) {
SpringApplication.run(SpringbootOrderRabbitmqProducer.class,args);
}
}
创建一个测试类
@SpringBootTest
public class SpringbootOrderRabbitmqProducerTest {
@Autowired
private OrderService orderService;
@Test
void test1(){
// 随便输入用户id 商品id 数量
orderService.makeOrder("142" , "111" ,10);
}
}
运行后,进入web界面可以发现,队列和交换机正常绑定了,并且这几个队列中的消息也有了。
消费方的实现创建model,导入相应的坐标,和消息提供方一样
创建配置文件,也和消息提供方一样,只是端口一样会冲突,需要改一下端口
接下来编写几个service,分别为那几个消息消费者,这是email的消费者 另外几个也是一模一样的代码
@Service
@RabbitListener(queues = {"order.queue.email"})
public class EmailConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("email服务消费的消息是:" + message);
}
}
然后创建一个主启动类,启动即可消费到队列中的服务了。
集成direct模式direct模式和fanout模式相比只是多了一个路由key,其他的大致相同,首先第一个要更改的地方就是配置类,需要将声明fanout模式的交换机的地方改为声明direct模式的交换机,第二个需要更改的地方就是将交换机与队列进行绑定的时候需要添加
package com.hs.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMqDirectConfiguration {
@Bean
public DirectExchange directExchange(){
// 在创建对象的时候指定交换机名
return new DirectExchange("direct_order_exchange");
}
@Bean
public Queue queueNote(){
// 队列名 + 是否持久化
return new Queue("order.queue.note",true);
}
@Bean
public Queue queueEmail(){
return new Queue("order.queue.email",true);
}
@Bean
public Queue queueSMS(){
return new Queue("order.queue.sms",true);
}
@Bean
public Binding smsBinding(){
// bind(队列对象).to(交换机对象).with(路由key) 这里的两个方法参数中的方法就是我们上面自定义的方法
return BindingBuilder.bind(queueSMS()).to(directExchange()).with("sms");
}
@Bean
public Binding emailBinding(){
return BindingBuilder.bind(queueEmail()).to(directExchange()).with("email");
}
@Bean
public Binding noteBinding(){
return BindingBuilder.bind(queueNote()).to(directExchange()).with("note");
}
}
然后就是在service里面,将消息发送至交换机的地方修改一下交换机名 以及 路由key
public void makeOrder(String userid , String productid , int number){
// 1. 查看商品库存是否充足
// 2. 保存该订单
String orderId = UUID.randomUUID().toString();
// 3. 通过MQ来进行消息的分发 也就是将消息写入交换机
String exchangeName = "direct_order_exchange";
String routingKey = "email";
// 该方法是三个参数为 交换机名 + 路由key/队列名 + 要发送的消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
// 如果还想给另外的路由key也发送就直接在写一条这个语句即可
rabbitTemplate.convertAndSend(exchangeName,"sms",orderId);
}
集成topic模式
springboot对于RabbitMQ的集成为我们提供了两种方式,注解和配置类。
如果我们想springboot集成topic模式,也可以想上面那样在配置类中声明一个topic类型的交换机,然后创建几个队列,再进行交换机和队列的绑定关系。
另一种方式就是基于注解的实现,做的事情还是一样的,创建交换机和队列然后进行绑定。
首先是消息提供方的代码
public void makeOrder(String userid , String productid , int number){
// 1. 查看商品库存是否充足
// 2. 保存该订单
String orderId = UUID.randomUUID().toString();
// 3. 通过MQ来进行消息的分发 也就是将消息写入交换机
String exchangeName = "这里写一个交换机名";
String routingKey = "这里是模糊路由key";
// 该方法是三个参数为 交换机名 + 路由key/队列名 + 要发送的消息
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
}
然后是消息消费方
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topic.order.emailQueue" , durable = "true" , autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange" , type = ExchangeTypes.TOPIC),
key = "#.email.#"
))
public class EmailConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("email服务消费的消息是:" + message);
}
}
接下来的几个和这个一样,修改一下注解中的 队列名和路由key即可
总结RabbitMQ 我们总共仅仅需要做五件事
-
创建exchange交换机
-
创建queue队列
-
交换机绑定队列,设置Routing Key 路由key
-
消费方从队列中去取消息
-
提供方往交换机中发消息
知识点小结:
- 如果没有指定交换机,使用的是默认的交换机
- 工作模式有两种消息分发策略:轮询+公平
- fanout模式就只是需要我们自己创建交换机,然后为每个绑定了的队列推送消息
- direct模式增加了路由key
- topic模式支持模糊路由key
注意的几点:
- 5672端口在服务器的防火墙以及云服务器官网安全组要开放
- 交换机与队列绑定关系最好还是在消费方进行
- 在web管理界面 、配置类方式、注解方式 想用哪种就用哪种,最好还是用配置类的方式



