文章目录提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
前言交换机类型交换机属性Direct exchange(直连交换机)Fanout exchange(扇型交换机)Topic exchange(主题交换机)消息持久化
前言
关于rabbitmq安装就不说了,直接安装下来是这个界面,默认账号 密码都是:guest,系统管理员角色,但是这个角色只能在本地使用,不可远程登陆,想要远程登录需要重写添加用户分配角色。
在可视化端可以手动添加交换机,队列,消息,创建用户,分配权限。
了解一下mq的生产消费流程,认真看别走神,要思考,带着疑问看下面的。
交换机有四种,消息发布(生产者)是发给哪一种交换机?交换机再把消息给到队列,给的又是哪个队列?订阅者又取哪个队列的消息?
写代码之前了解一下交换机都有哪些,里面都有啥属性。
交换机类型Direct exchange(直连交换机)
Fanout exchange(扇型交换机)
Topic exchange(主题交换机)
Headers exchange(头交换机)
amq.* exchanges 默认交换机
交换机属性Name 交换机名称
Type 交换机类型direct、topic、 fanout、 headers
Durability 是否需要持久化。如果持久化,则RabbitMQ重启后,交换机还存在
Auto-delete 当最后一个绑定到Exchange 上的队列删除后,自动删除该Exchange
Internal 当前Exchange是否于RabbitMQ内部使用,默认为False
导入依赖:
org.springframework.boot
spring-boot-starter-amqp
配置yml
server:
port: 8080
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
Direct exchange(直连交换机)
@Configuration
public class DirectRabbitConfig {
//队列 起名:TestDirectQueue
@Bean
public Queue TestDirectQueue(){
return new Queue("TestDirectQueue",true);
}
//Direct交换机 起名:TestDirectExchange
@Bean
DirectExchange TestDirectExchange(){
return new DirectExchange("TestDirectExchange",true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting(路由键)
@Bean
Binding bindingDirect(){
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting");
}
思想就是,定义一个队列,定义一个交换机,OK绑定他们,发消息的时候就可以把消息发送到定义的交换机,交换机把消息给绑定的队列,一个交换机可以绑定多个队列。
消息推送:
@Resource
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
//直连交换机
@GetMapping("/sendDirectMessage")
public String sendDirectMessage() {
//设置值
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "Direct Exchange test message, hello!";
String createTime = DateUtil.now();
//封装map一次发送
Map map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
//将消息携带绑定键值:TestDirectRouting(路由键)
// 发送到交换机TestDirectExchange
rabbitTemplate.convertAndSend(
"TestDirectExchange",
"TestDirectRouting",
map);
return "ok";
}
消息已经加到交换机对应的队列下了。
创建消费模块,pom依赖不变,yml配置也不变。
@Component
@RabbitListener(queues = "TestDirectQueue")//注:监听的队列名称:TestDirectQueue
public class DirectReceiverService {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("DirectReceiver消费者收到消息 :"+testMessage.toString());
}
}
收到消息,接收成功,实时消费。
注意:直连交换机是一个消费者监听一个队列,如果两个消费者同时监听一个队列,会出现轮询消费消息,消息不会重复。直白点儿就是消费者A,B都监听TestDirectQueue就会出现轮询消费的情况。
道理一样形式不同,上代码
@Configuration
public class FanoutRabbitConfig {
@Resource
private RabbitTemplate rabbitTemplate;
@Bean
public Queue queueA() {
return new Queue("fanout.A",true);
}
@Bean
public Queue queueB() {
return new Queue("fanout.B",true);
}
@Bean
public Queue queueC() {
return new Queue("fanout.C",true);
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange",true,false);
}
@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
创建三个队列,一个交换机,将他们绑定在一起,不需要通过路由键绑定,定义了也无效。
推送消息
//扇型交换机
@GetMapping("/sendFanoutMessage")
public String sendFanoutMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = "message: testFanoutMessage ";
String createTime = DateUtil.now();
Map map = new HashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);
rabbitTemplate.convertAndSend("fanoutExchange", null, map);
return "ok";
}
这里的路由键直接设为null就行
消费类:
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString());
}
}
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverB消费者收到消息 : " +testMessage.toString());
}
}
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("FanoutReceiverC消费者收到消息 : " +testMessage.toString());
}
}
只要绑定了交换机,三个监听类会同时受到这条消息
直接上代码
@Configuration
public class TopicRabbitConfig {
/
@Component
@RabbitListener(queues = "topic.woman")//注:监听的队列名称:topic.woman
public class TopicWomanReceiver {
@RabbitHandler
public void process(Map testMessage) {
System.out.println("TopicTotalReceiver woman消费者收到消息 : " + testMessage.toString());
}
}
需要设置交换机和队列都是持久化状态就可以了,消息默认是持久化的。
1是非持久化
本篇暂先到这里了,想继续深入了解自动手动确认,死信号,AMQP协议,以及队列长度,消息大小限制,后续再见。



