消息中间件的应用场景主要有:异步处理、应用解耦、流量削峰等。
生产者发送消息通过不同类型的交互机发送到不同的消息队列中。
消费者只关心消息队列,与交换机无关,至于生产者如何发送的(直接发送到队列还是通过交换机的方式发送)毫不关心。
在application.properties添加RabbitMQ基本配置4.0.0 org.springframework.boot spring-boot-starter-parent 2.4.5 com.practice.springboot rabbitmqtest 0.0.1-SNAPSHOT rabbitmqtest rabbitmqTest 1.8 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-test test
# ip spring.rabbitmq.host=127.0.0.1 # 端口 spring.rabbitmq.port=5672 # 用户名 spring.rabbitmq.username=guest # 密码 spring.rabbitmq.password=guest # 配置虚拟机 spring.rabbitmq.virtual-host=/Routing工作模式(Direct Exchange类型)示例 编写生产者
package com.practice.springboot.rabbitmqtest.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class DirectRabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String vHost;
// 交换机
public static final String TestDirectExchange = "TestDirectExchange";
// 消息队列
public static final String TestDirectQueue = "TestDirectQueue";
// RoutingKey
public static final String TestDirectRouting = "TestDirectRouting";
// 可以省略,使用默认的连接工厂
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(host);
cachingConnectionFactory.setPort(port);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
cachingConnectionFactory.setVirtualHost(vHost);
// 可以不配置,默认生产者不确认发送应答
cachingConnectionFactory.setPublisher/confirm/iType(CachingConnectionFactory./confirm/iType.CORRELATED);
return cachingConnectionFactory;
}
// 可以省略,使用默认的RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
// 用于在RabbitMq服务器上生成交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange(TestDirectExchange, true, false);
}
// 用于在RabbitMq服务器上生成消息队列
@Bean
public Queue testDirectQueue() {
// durable:是否持久化,默认是false; 持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在; 暂存队列:当前连接有效。
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:默认false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
// 一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue(TestDirectQueue, true);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(testDirectQueue()).to(directExchange()).with(TestDirectRouting);
}
}
生产者发送消息测试示例
package com.practice.springboot.rabbitmqtest.test;
import com.practice.springboot.rabbitmqtest.config.DirectRabbitConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TestDirectModel {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDirect() {
rabbitTemplate.convertAndSend(DirectRabbitConfig.TestDirectExchange,
DirectRabbitConfig.TestDirectRouting, "test direct exchange");
}
}
执行两次测试用例;
因为目前还没有消费者 ,消息还没有被消费,结果如下:
可见消息已经发送到RabbitMq服务器上了。
如果消费者和生产者在同一个模块,可以不需要下面的配置,直接写一个监听类即可。
导入maven依赖:application.properties配置org.springframework.boot spring-boot-starter-amqp
# ip spring.rabbitmq.host=127.0.0.1 # 端口 spring.rabbitmq.port=5672 # 用户名 spring.rabbitmq.username=guest # 密码 spring.rabbitmq.password=guest # 配置虚拟机 spring.rabbitmq.virtual-host=/创建DirectRabbitMqConfig配置类
如果消费者单纯的接收消息,可以不用添加这个配置类,直接新建后面的监听器就好,使用注解来让监听器监听对应队列即可。配置上的话,消费者也可以作为生产者,也能推送消息。
@Configuration
public class DirectRabbitMqConfig {
@Bean
public Queue testDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("TestDirectQueue", true);
}
@Bean
public DirectExchange testDirectExchange() {
return new DirectExchange("TestDirectExchange", true, false);
}
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("TestDirectRouting");
}
}
添加消息监听类
package com.practice.springboot.rabbitmqtest.controller;
import com.practice.springboot.rabbitmqtest.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class RabbitMqConsumer {
@RabbitListener(queues = DirectRabbitConfig.TestDirectQueue)
public void receive(Message message) {
System.out.println("message = " + message);
}
}
测试结果
项目启动,可以看到之前的两条消息被消费了
2021-11-10 17:40:06.795 INFO 8708 --- [ main] c.p.s.r.RabbitmqtestApplication : Started RabbitmqtestApplication in 2.638 seconds (JVM running for 3.623)
message = GenericMessage [payload=test direct exchange, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=TestDirectExchange, amqp_deliveryTag=1, amqp_consumerQueue=TestDirectQueue, amqp_redelivered=true, amqp_receivedRoutingKey=TestDirectRouting, timestamp_in_ms=1636470108552, amqp_contentEncoding=UTF-8, amqp_timestamp=Tue Nov 09 23:01:48 CST 2021, id=039d4adf-25cf-a278-701a-9923d49abf8c, amqp_consumerTag=amq.ctag-OTugfCIczA7blzOcnooPcA, amqp_lastInBatch=false, contentType=text/plain, timestamp=1636537206802}]
message = GenericMessage [payload=test direct exchange, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=TestDirectExchange, amqp_deliveryTag=2, amqp_consumerQueue=TestDirectQueue, amqp_redelivered=false, amqp_receivedRoutingKey=TestDirectRouting, timestamp_in_ms=1636470185702, amqp_contentEncoding=UTF-8, amqp_timestamp=Tue Nov 09 23:03:05 CST 2021, id=dbc839cd-2691-d123-22c4-27baf957ef35, amqp_consumerTag=amq.ctag-OTugfCIczA7blzOcnooPcA, amqp_lastInBatch=false, contentType=text/plain, timestamp=1636537235602}]
Topic工作模式(Topic Exchange类型交换机)
生产者
配置Topic 交换机
package com.practice.springboot.rabbitmqtest.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicModelRabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String vHost;
// 交换机
public static final String TestTopicExchange = "TestTopicExchange";
public static final String TestTopicQueue1 = "TestTopicQueue1";
public static final String TestTopicQueue2 = "TestTopicQueue2";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TestTopicExchange, true, false);
}
@Bean
public Queue firstQueue() {
return new Queue(TestTopicQueue1, true);
}
@Bean
public Queue secondQueue() {
return new Queue(TestTopicQueue2, true);
}
@Bean
public Binding bindingFirst() {
return BindingBuilder.bind(firstQueue()).to(topicExchange()).with("topic.man");
}
@Bean
public Binding bindingSecond() {
return BindingBuilder.bind(secondQueue()).to(topicExchange()).with("topic.#");
}
}
生产者发送消息
package com.practice.springboot.rabbitmqtest.test;
import com.practice.springboot.rabbitmqtest.config.TopicModelRabbitConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class TestTopicModel {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testTopic1() {
rabbitTemplate.convertAndSend(TopicModelRabbitConfig.TestTopicExchange, "topic.man",
"test topic exchange 1");
}
@Test
public void testTopic2() {
rabbitTemplate.convertAndSend(TopicModelRabbitConfig.TestTopicExchange, "topic.woman", "test topic exchange 2");
}
}
testTopic1方法发送消息,TestTopicQueue1 、TestTopicQueue2都收到消息;
testTopic2方法发送消息,只有TestTopicQueue2队列收到消息。
@RabbitListener(queues = TopicModelRabbitConfig.TestTopicQueue1)
public void receiveTopic1(Message message) {
System.out.println(message.getPayload());
}
@RabbitListener(queues = TopicModelRabbitConfig.TestTopicQueue2)
public void receiveTopic2(Message message) {
System.out.println(message.getPayload());
}
测试结果
可以看到,TestTopicQueue1 的一条消息、TestTopicQueue2的两条消息都被消费了
test topic exchange 1 test topic exchange 1 test topic exchange 2发布订阅工作模式(fanout exchange扇形交换机)
发布定义模式使用的是扇形交换机,只要绑定到交换机上的队列都会接收到消息,路由无效。
生产者 配置扇形交换机package com.practice.springboot.rabbitmqtest.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitMqConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("TestFanoutExchange", true, false);
}
@Bean
public Queue queueA() {
return new Queue("TestQueueA", true);
}
@Bean
public Queue queueB() {
return new Queue("TestQueueB", true);
}
@Bean
public Queue queueC() {
return new Queue("TestQueueC", true);
}
// bind queue and exchange
@Bean
public Binding bindingA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
public Binding bindingB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
public Binding bindingC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
生产者发送消息
@Test
public void testTopic1() {
rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout exchange");
}
消息被发送到3个队列中。
消费者 @RabbitListener(queues = "TestQueueA")
public void receiveTopicA(Message message) {
System.out.println(message.getPayload());
}
@RabbitListener(queues = "TestQueueB")
public void receiveTopicB(Message message) {
System.out.println(message.getPayload());
}
@RabbitListener(queues = "TestQueueC")
public void receiveTopicC(Message message) {
System.out.println(message.getPayload());
}
测试结果
test fanout exchange test fanout exchange test fanout exchange生产者发送事务消息
事务消息,指生产者发送一组消息,消费者要么全部收到,要么都收不到。
使用上面扇形交换机的例子来展示事务收发。
在fanout exchange 代码的基础上添加了rabbitMq的事务管理器:RabbitTransactionManager
package com.practice.springboot.rabbitmqtest.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitMqConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("TestFanoutExchange", true, false);
}
// 配置启用rabbitmq事务
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public Queue queueA() {
return new Queue("TestQueueA", true);
}
@Bean
public Queue queueB() {
return new Queue("TestQueueB", true);
}
@Bean
public Queue queueC() {
return new Queue("TestQueueC", true);
}
// bind queue and exchange
@Bean
public Binding bindingA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
public Binding bindingB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
public Binding bindingC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
生产者发送消息
不能使用之前的测试用例的方式来测试,否则会导致事务管理器多次初始化,无法正常测试。
下面写一个Rest API接口用于测试。
package com.practice.springboot.rabbitmqtest.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
@RestController
public class RabbitMqProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setChannelTransacted(true);
}
@GetMapping("/sendMsg")
@Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
public void testTransactionMessage() throws InterruptedException {
rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout transaction exchange1");
rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout transaction exchange2");
// 模拟异常
System.out.println(1/0);
}
}
- 在发送消息的testTransactionMessage()方法上,加了@Transactional注解,表示这个方法将启用事务(此时的事务即是RabbitMQ事务,因为使用了RabbitTransactionManager )。
- 启用事务,需要在系统初始化时,调用rabbitTemplate.setChannelTransacted(true),以激活rabbitTemplate对象事务处理功能。
调用接口,发现异常,同时消费者并没有收到消息。
2021-11-10 20:45:06.637 ERROR 4560 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause java.lang.ArithmeticException: / by zero at com.practice.springboot.rabbitmqtest.controller.RabbitMqProvider.testTransactionMessage(RabbitMqProvider.java:34) ~[classes/:na]关闭事务,消息正常发送 不启用通道事务功能
@PostConstruct
public void init() {
// rabbitTemplate.setChannelTransacted(true);
}
可以看到消息正常发出了。
2021-11-10 20:39:47.072 INFO 3480 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms test fanout transaction exchange1 test fanout transaction exchange1 test fanout transaction exchange1 test fanout transaction exchange2 test fanout transaction exchange2 test fanout transaction exchange2 2021-11-10 20:39:47.115 ERROR 3480 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause不使用事务注解
@GetMapping("/sendMsg")
// @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
public void testTransactionMessage() throws InterruptedException {
rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout transaction exchange1");
rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout transaction exchange2");
// generate exception
System.out.println(1/0);
}
可以看到消息也正常发出了。
2021-11-10 20:42:49.950 INFO 13532 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms test fanout transaction exchange1 test fanout transaction exchange1 test fanout transaction exchange1 test fanout transaction exchange2 test fanout transaction exchange2 test fanout transaction exchange2 2021-11-10 20:42:49.997 ERROR 13532 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause参考
Spring Boot整合RabbitMQ详细教程



