栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Spring Boot整合RabbitMQ-RabbitMQ

Spring Boot整合RabbitMQ-RabbitMQ

概述

消息中间件的应用场景主要有:异步处理、应用解耦、流量削峰等。
生产者发送消息通过不同类型的交互机发送到不同的消息队列中。
消费者只关心消息队列,与交换机无关,至于生产者如何发送的(直接发送到队列还是通过交换机的方式发送)毫不关心。

导入maven依赖


    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.4.5
         
    
    com.practice.springboot
    rabbitmqtest
    0.0.1-SNAPSHOT
    rabbitmqtest
    rabbitmqTest
    
        1.8
    
    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        

在application.properties添加RabbitMQ基本配置
# ip
spring.rabbitmq.host=127.0.0.1
# 端口
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest
# 配置虚拟机
spring.rabbitmq.virtual-host=/
Routing工作模式(Direct Exchange类型)示例 编写生产者
package com.practice.springboot.rabbitmqtest.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Configurable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;


@Configuration
public class DirectRabbitConfig {
    

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String vHost;

    // 交换机
    public static final String TestDirectExchange = "TestDirectExchange";

    // 消息队列
    public static final String TestDirectQueue = "TestDirectQueue";

    // RoutingKey
    public static final String TestDirectRouting = "TestDirectRouting";

    // 可以省略,使用默认的连接工厂
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setHost(host);
        cachingConnectionFactory.setPort(port);
        cachingConnectionFactory.setUsername(username);
        cachingConnectionFactory.setPassword(password);
        cachingConnectionFactory.setVirtualHost(vHost);
        // 可以不配置,默认生产者不确认发送应答
        cachingConnectionFactory.setPublisher/confirm/iType(CachingConnectionFactory./confirm/iType.CORRELATED);
        return cachingConnectionFactory;
    }

    // 可以省略,使用默认的RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

	// 用于在RabbitMq服务器上生成交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(TestDirectExchange, true, false);
    }
	
	// 用于在RabbitMq服务器上生成消息队列
    @Bean
    public Queue testDirectQueue() {
        // durable:是否持久化,默认是false; 持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在; 暂存队列:当前连接有效。
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:默认false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        // 一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue(TestDirectQueue, true);
    }

    
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(testDirectQueue()).to(directExchange()).with(TestDirectRouting);
    }
}
生产者发送消息测试示例
package com.practice.springboot.rabbitmqtest.test;

import com.practice.springboot.rabbitmqtest.config.DirectRabbitConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestDirectModel {


    //注入rabbitTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testDirect() {
        rabbitTemplate.convertAndSend(DirectRabbitConfig.TestDirectExchange,
                DirectRabbitConfig.TestDirectRouting, "test direct exchange");
    }
}

执行两次测试用例;
因为目前还没有消费者 ,消息还没有被消费,结果如下:

可见消息已经发送到RabbitMq服务器上了。

编写消费者

如果消费者和生产者在同一个模块,可以不需要下面的配置,直接写一个监听类即可。

导入maven依赖:

 
     org.springframework.boot
     spring-boot-starter-amqp
 
application.properties配置
# ip
spring.rabbitmq.host=127.0.0.1
# 端口
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=guest
# 密码
spring.rabbitmq.password=guest
# 配置虚拟机
spring.rabbitmq.virtual-host=/
创建DirectRabbitMqConfig配置类

如果消费者单纯的接收消息,可以不用添加这个配置类,直接新建后面的监听器就好,使用注解来让监听器监听对应队列即可。配置上的话,消费者也可以作为生产者,也能推送消息。

@Configuration
public class DirectRabbitMqConfig {
 
    
    @Bean
    public Queue testDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("TestDirectQueue", true);
    }
 
    
    @Bean
    public DirectExchange testDirectExchange() {
        return new DirectExchange("TestDirectExchange", true, false);
    }
 
    
    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("TestDirectRouting");
    }

}
添加消息监听类
package com.practice.springboot.rabbitmqtest.controller;

import com.practice.springboot.rabbitmqtest.config.DirectRabbitConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class RabbitMqConsumer {
    @RabbitListener(queues = DirectRabbitConfig.TestDirectQueue)
    public void receive(Message message) {
        System.out.println("message = " + message);
    }
}
测试结果

项目启动,可以看到之前的两条消息被消费了

2021-11-10 17:40:06.795  INFO 8708 --- [           main] c.p.s.r.RabbitmqtestApplication          : Started RabbitmqtestApplication in 2.638 seconds (JVM running for 3.623)
message = GenericMessage [payload=test direct exchange, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=TestDirectExchange, amqp_deliveryTag=1, amqp_consumerQueue=TestDirectQueue, amqp_redelivered=true, amqp_receivedRoutingKey=TestDirectRouting, timestamp_in_ms=1636470108552, amqp_contentEncoding=UTF-8, amqp_timestamp=Tue Nov 09 23:01:48 CST 2021, id=039d4adf-25cf-a278-701a-9923d49abf8c, amqp_consumerTag=amq.ctag-OTugfCIczA7blzOcnooPcA, amqp_lastInBatch=false, contentType=text/plain, timestamp=1636537206802}]
message = GenericMessage [payload=test direct exchange, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=TestDirectExchange, amqp_deliveryTag=2, amqp_consumerQueue=TestDirectQueue, amqp_redelivered=false, amqp_receivedRoutingKey=TestDirectRouting, timestamp_in_ms=1636470185702, amqp_contentEncoding=UTF-8, amqp_timestamp=Tue Nov 09 23:03:05 CST 2021, id=dbc839cd-2691-d123-22c4-27baf957ef35, amqp_consumerTag=amq.ctag-OTugfCIczA7blzOcnooPcA, amqp_lastInBatch=false, contentType=text/plain, timestamp=1636537235602}]
Topic工作模式(Topic Exchange类型交换机) 生产者 配置Topic 交换机
package com.practice.springboot.rabbitmqtest.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class TopicModelRabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String vHost;

    // 交换机
    public static final String TestTopicExchange = "TestTopicExchange";

    public static final String TestTopicQueue1 = "TestTopicQueue1";

    public static final String TestTopicQueue2 = "TestTopicQueue2";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TestTopicExchange, true, false);
    }

    @Bean
    public Queue firstQueue() {
        return new Queue(TestTopicQueue1, true);
    }

    @Bean
    public Queue secondQueue() {
        return new Queue(TestTopicQueue2, true);
    }

    
    @Bean
    public Binding bindingFirst() {
        return BindingBuilder.bind(firstQueue()).to(topicExchange()).with("topic.man");
    }

    
    @Bean
    public Binding bindingSecond() {
        return BindingBuilder.bind(secondQueue()).to(topicExchange()).with("topic.#");
    }
}
生产者发送消息
package com.practice.springboot.rabbitmqtest.test;

import com.practice.springboot.rabbitmqtest.config.TopicModelRabbitConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class TestTopicModel {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testTopic1() {
        rabbitTemplate.convertAndSend(TopicModelRabbitConfig.TestTopicExchange, "topic.man",
                "test topic exchange 1");
    }

    @Test
    public void testTopic2() {
        rabbitTemplate.convertAndSend(TopicModelRabbitConfig.TestTopicExchange, "topic.woman", "test topic exchange 2");
    }
}

testTopic1方法发送消息,TestTopicQueue1 、TestTopicQueue2都收到消息;
testTopic2方法发送消息,只有TestTopicQueue2队列收到消息。

消费者
    @RabbitListener(queues = TopicModelRabbitConfig.TestTopicQueue1)
    public void receiveTopic1(Message message) {
        System.out.println(message.getPayload());
    }

    @RabbitListener(queues = TopicModelRabbitConfig.TestTopicQueue2)
    public void receiveTopic2(Message message) {
        System.out.println(message.getPayload());
    }
测试结果

可以看到,TestTopicQueue1 的一条消息、TestTopicQueue2的两条消息都被消费了

test topic exchange 1
test topic exchange 1
test topic exchange 2
发布订阅工作模式(fanout exchange扇形交换机)

发布定义模式使用的是扇形交换机,只要绑定到交换机上的队列都会接收到消息,路由无效。

生产者 配置扇形交换机
package com.practice.springboot.rabbitmqtest.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanoutRabbitMqConfig {
    

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("TestFanoutExchange", true, false);
    }

    @Bean
    public Queue queueA() {
        return new Queue("TestQueueA", true);
    }

    @Bean
    public Queue queueB() {
        return new Queue("TestQueueB", true);
    }

    @Bean
    public Queue queueC() {
        return new Queue("TestQueueC", true);
    }

    // bind queue and exchange
    @Bean
    public Binding bindingA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}
生产者发送消息
    @Test
    public void testTopic1() {
        rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout exchange");
    }

消息被发送到3个队列中。

消费者
    @RabbitListener(queues = "TestQueueA")
    public void receiveTopicA(Message message) {
        System.out.println(message.getPayload());
    }

    @RabbitListener(queues = "TestQueueB")
    public void receiveTopicB(Message message) {
        System.out.println(message.getPayload());
    }

    @RabbitListener(queues = "TestQueueC")
    public void receiveTopicC(Message message) {
        System.out.println(message.getPayload());
    }
测试结果
test fanout exchange
test fanout exchange
test fanout exchange
生产者发送事务消息

事务消息,指生产者发送一组消息,消费者要么全部收到,要么都收不到。
使用上面扇形交换机的例子来展示事务收发。

配置事务管理器

在fanout exchange 代码的基础上添加了rabbitMq的事务管理器:RabbitTransactionManager

package com.practice.springboot.rabbitmqtest.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class FanoutRabbitMqConfig {
    

    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("TestFanoutExchange", true, false);
    }

    // 配置启用rabbitmq事务
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    @Bean
    public Queue queueA() {
        return new Queue("TestQueueA", true);
    }

    @Bean
    public Queue queueB() {
        return new Queue("TestQueueB", true);
    }

    @Bean
    public Queue queueC() {
        return new Queue("TestQueueC", true);
    }

    // bind queue and exchange
    @Bean
    public Binding bindingA() {
        return BindingBuilder.bind(queueA()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingB() {
        return BindingBuilder.bind(queueB()).to(fanoutExchange());
    }

    @Bean
    public Binding bindingC() {
        return BindingBuilder.bind(queueC()).to(fanoutExchange());
    }
}
生产者发送消息

不能使用之前的测试用例的方式来测试,否则会导致事务管理器多次初始化,无法正常测试。
下面写一个Rest API接口用于测试。

package com.practice.springboot.rabbitmqtest.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.PostConstruct;


@RestController
public class RabbitMqProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setChannelTransacted(true);
    }

    @GetMapping("/sendMsg")
    @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
    public void testTransactionMessage() throws InterruptedException {
        rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout transaction exchange1");
        rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout transaction exchange2");
        // 模拟异常
        System.out.println(1/0);
    }
}
  • 在发送消息的testTransactionMessage()方法上,加了@Transactional注解,表示这个方法将启用事务(此时的事务即是RabbitMQ事务,因为使用了RabbitTransactionManager )。
  • 启用事务,需要在系统初始化时,调用rabbitTemplate.setChannelTransacted(true),以激活rabbitTemplate对象事务处理功能。
测试 异常,消息都没有发出

调用接口,发现异常,同时消费者并没有收到消息。

2021-11-10 20:45:06.637 ERROR 4560 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause

java.lang.ArithmeticException: / by zero
	at com.practice.springboot.rabbitmqtest.controller.RabbitMqProvider.testTransactionMessage(RabbitMqProvider.java:34) ~[classes/:na]
关闭事务,消息正常发送 不启用通道事务功能
    @PostConstruct
    public void init() {
//        rabbitTemplate.setChannelTransacted(true);
    }

可以看到消息正常发出了。

2021-11-10 20:39:47.072  INFO 3480 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
test fanout transaction exchange1
test fanout transaction exchange1
test fanout transaction exchange1
test fanout transaction exchange2
test fanout transaction exchange2
test fanout transaction exchange2
2021-11-10 20:39:47.115 ERROR 3480 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause
不使用事务注解
    @GetMapping("/sendMsg")
//    @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
    public void testTransactionMessage() throws InterruptedException {
        rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout transaction exchange1");
        rabbitTemplate.convertAndSend("TestFanoutExchange", null, "test fanout transaction exchange2");
        // generate exception
        System.out.println(1/0);
    }

可以看到消息也正常发出了。

2021-11-10 20:42:49.950  INFO 13532 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
test fanout transaction exchange1
test fanout transaction exchange1
test fanout transaction exchange1
test fanout transaction exchange2
test fanout transaction exchange2
test fanout transaction exchange2
2021-11-10 20:42:49.997 ERROR 13532 --- [nio-8080-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root cause
参考

Spring Boot整合RabbitMQ详细教程

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/467604.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号