- 启动虚拟机
- 启动RabbitMQ服务
- 创建生产者maven工程
- 导入依赖
- 创建配置文件
- 创建配置类
- 创建启动类
- 创建测试类
- 启动测试类
- 创建消费者maven工程
- 导入依赖
- 创建配置文件
- 创建启动类
- 创建消费者监听器
- 启动启动类
- 关闭防火墙:systemctl stop friewalld
- 开启服务:systemctl start rabbitmq-server.servcie
- 查看进程:ps -ef|grep rabbitmq
spring-boot-starter-parent
org.springframework.boot
2.3.6.RELEASE
org.springframework.boot
spring-boot-starter-amqp
org.springframework.boot
spring-boot-starter-test
创建配置文件
spring:
rabbitmq:
host: 192.168.137.130
port: 5672 #RabbitMQ默认端口号
virtual-host: /
username: admin
password: 199941
创建配置类
package com.atguigu.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
@SpringBootConfiguration//声明当前类是一个springboot配置类
public class RabbitMQConfig {
//准备一个字符串常量作为交换机名称
public static final String EXCHANGE_NAME = "springboot_topic_exchange";
//准备一个字符串常量作为队列名称
public static final String QUEUE_NAME = "springboot_queue";
//创建交换机
@Bean
//返回的类型为org.springframework.amqp.core包下的Exchange
public Exchange getExchange(){
ExchangeBuilder topicExchange = ExchangeBuilder.topicExchange(EXCHANGE_NAME);
Exchange exchange = topicExchange.durable(true).build();
return exchange;
}
//创建队列
@Bean
//返回的类型为org.springframework.amqp.core包下的Queue
public Queue getQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//将队列绑定到交换机
@Bean
//返回的类型为org.springframework.amqp.core包下的Binding
//@Qualifier注解:在注入的过程中先根据类型注入,如果根据类型无法注入的话,根据属性名去容器里面找该属性
public Binding bindQueueToExchangeE(Exchange exchange, Queue queue){
//将队列绑定到交换机上
//with方法:指定路由键
//noargs方法:没有参数
return BindingBuilder.bind(queue).to(exchange).with("spring-boot.#").noargs();
}
}
创建启动类
package com.atguigu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication//标注当前类为springboot启动类
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
创建测试类
package com.atguigu;
import com.atguigu.config.RabbitMQConfig;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest//标注当前类为springboot测试类
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
//测试发送消息
@Test
public void testSendMessage(){
//使用 rabbitTemplate 的 convertAndSend 方法,将消息进行转换之后发送到了 RabbitMQ Server 中
//其中第一个参数为交换机名称
//第二个参数为路由key
//第三个参数要发送的信息
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"spring-boot.haha","测试spring整合RabbitMQ");
}
}
启动测试类
可以看到队列中产生了一条消息
spring-boot-starter-parent
org.springframework.boot
2.3.6.RELEASE
org.springframework.boot
spring-boot-starter-amqp
创建配置文件
spring:
rabbitmq:
host: 192.168.137.130
port: 5672 #RabbitMQ默认端口号
virtual-host: /
username: admin
password: 199941
创建启动类
package com.atguigu;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class,args);
}
}
创建消费者监听器
package com.atguigu.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQConsumerListener {
//指定监听队列
@RabbitListener(queues = "springboot_queue")
//这里传入的参数是org.springframework.amqp.core下的message
public void getMessage(Message message){
//获取队列中的信息
MessageProperties messageProperties = message.getMessageProperties();
//获取交换机名称
System.out.println("messageProperties.getReceivedExchange() = " + messageProperties.getReceivedExchange());
//获取路由键
System.out.println("messageProperties.getReceivedRoutingKey() = " + messageProperties.getReceivedRoutingKey());
//获取消费者
System.out.println("messageProperties.getConsumerTag() = " + messageProperties.getConsumerTag());
//获取交付标签
System.out.println("messageProperties.getDeliveryTag() = " + messageProperties.getDeliveryTag());
//获取信息
System.out.println("new String(message.getBody()) = " + new String(message.getBody()));
}
}
启动启动类
可以看到控制台获取到了生产者发送的消息以及消息队列中的信息



