DirectExchange 路由策略是将消息队列绑定到 DirectExchange 上,当 一条消息到达DirectExchange 时会被转发到与该条消息
routing key 相同的 Queue 上,例如消息队列名为“hello-queue ”,则 routingkey 为“hello-queue ”的消息会被该消息队列接收。
创建项目,并添加依赖
org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.springframework.boot spring-boot-starter-amqp
配置文件
spring.application.name=rabbitmq-demo02 spring.rabbitmq.password=guest spring.rabbitmq.username=guest spring.rabbitmq.port=5672 spring.rabbitmq.host=192.168.100.120 # 设置交换器名称 mq.config.exchange=log.direct # info 队列名称 mq.config.queue.info=log.info # info 路由键
消费者
package com.bobo.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true")
,exchange = @Exchange(value = "${mq.config.exchange}"
,type = ExchangeTypes.DIRECT)
,key = "${mq.config.queue.error.routing.key}"
)
) p
ublic class ErrorRecevier {
@RabbitHandler
public void process(String msg){
System.out.println("error....recevier:" + msg);
}
} p
ackage com.bobo.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true")
,exchange = @Exchange(value = "${mq.config.exchange}"
,type = ExchangeTypes.DIRECT)
,key = "${mq.config.queue.info.routing.key}"
)
) p
ublic class InfoRecevier {
@RabbitHandler
public void process(String msg){
System.out.println("info....recevier:" + msg);
}
}
1.2 创建生产者
创建一个SpringBoot项目,添加和上面一样的依赖
配置文件有区别,不需要添加队列的配置信息
spring.application.name=rabbitmq-demo03 添加生产者的类 测试效果 spring.rabbitmq.password=guest spring.rabbitmq.username=guest spring.rabbitmq.port=5672 spring.rabbitmq.host=192.168.100.120 # 设置交换器名称 mq.config.exchange=log.direct # info 路由键 mq.config.queue.info.routing.key=log.info.routing.key # error 路由键 mq.config.queue.error.routing.key=log.error.routing.key
添加生产者的类
package com.bobo.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private AmqpTemplate template;
@Value("${mq.config.exchange}")
private String exchange;
@Value("${mq.config.queue.info.routing.key}")
private String routingKey;
public void send(String msg){
// 发送消息
template.convertAndSend(exchange,routingKey,msg);
}
} @
SpringBootTest
class RabbitmqDemo03ApplicationTests {
@Autowired
private Sender sender;
@Test
void contextLoads() {
sender.send("Hello RabbitMQ .... ");
}
}
测试效果
TopicExchange 是比较复杂也比较灵活的 种路由策略,在TopicExchange 中,Queue 通过routingkey 绑定到 TopicExchange 上,当消息到
达 TopicExchange 后,TopicExchange 根据消息的routingkey 消息路由到一个或者多 Queue上,相比direct模式topic会更加的灵活些。
package com.bobo.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true")
,exchange = @Exchange(
value = "${mq.config.exchange}"
,type = ExchangeTypes.TOPIC)
,key = "*.log.error"
)
) p
ublic class ErrorRecevier {
@RabbitHandler
public void process(String msg){
System.out.println("error ... recevier:" + msg);
}
} p
ackage com.bobo.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class OrderSender {
@Autowired
private AmqpTemplate template;
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg){
template.convertAndSend(exchange,"Order.log.debug","Order log debug"+msg);
template.convertAndSend(exchange,"Order.log.info","Order log info"+msg);
template.convertAndSend(exchange,"Order.log.error","Order log error"+msg);
template.convertAndSend(exchange,"Order.log.warn","Order log warn"+msg);
}
}
3. Fanout案例
FanoutExchange 的数据交换策略是把所有到达 FanoutExchang 的消息转发给所有与它绑定的Queue ,在这种策略中, routingkey 将
不起任何作用.
package com.bobo.consumer;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "true")
,exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.FANOUT)
)
) p
ublic class SmsRecevier {
@RabbitHandler
public void process(String msg){
System.out.println("Sms .... recevider:" + msg);
}
}
p
ackage com.bobo.provider;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
二、RabbitMQ高级
1. 持久化
消息的可靠性是RabbitMQ的一大特色,RabbitMQ是如何保证消息的可靠性的呢?–> 消息的持久化
创建消费者
注意,此时我们需要设置autoDelete=false
创建服务提供者
@Component
public class Sender {
@Autowired
private AmqpTemplate template;
@Value("${mq.config.exchange}")
private String exchange;
public void send(String msg){
template.convertAndSend(exchange,"",msg);
}
}
单元测试
当消费者处理了一段时间的消息之后,断开连接,然后消费者再上线我们发现消费者又能够处理掉下线后提供者发送的消息,保证了消息的
完整性
autoDelete属性可以在@Queue配置也可以在@Exchange配置,具体如下:
@Queue:当所有的消费者客户端连接断开后,是否自定删除队列 true:删除,false:不删除 @Exchange:当所有的绑定队列都不再使用时,是否自动删除交换器 true:删除,false:不删除2. ACK确认机制 2.1 什么是ACK
如果消息在处理过程中,消费者的服务器在处理消息时出现了异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢
失,为了确保数据不会丢失,RabbitMQ支持消息确认机制-ACK
ACK(Acknowledge Character)是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ的,RabbitMQ接收到反馈信息后才会将消息从队列中删除。
- 如果一个消费者在处理消息出现了网络不稳定,集群异常等现象,会将消息重新放入队列中。
- 如果在集群的情况下,RabbitMQ会立即将这个消息推送给这个在线的其他的消费者,这种机制保障了消费者在服务端故障的时候不会
丢失任何的数据和任务 - 消息永远不会从RabbitMQ中删除:只有当消费者正确发送ACK反馈后,RabbitMQ收到确认后,消息才会从RabbitMQ的服务中删除
- 消息的ACK机制默认就是打开的
ACK的验证
在服务端我们给出一个错误
然后我们再去掉错误,发现消息会被正常的消费
ACK的注意事项
如果忘记掉ACK,那么后果会比较严重,当Consumer退出时,Message会一直重复分发,然后RabbitMQ会占用越来越多的内存,由于
RabbitMQ会长时间的运行,因此这个 内存泄漏 是致命的,我们可以通过设置重试次数来防止这个问题,在Consumer的
application.properties中设置如下参数
spring.rabbitmq.listener.simple.retry.enabled=true ## 设置重试次数 spring.rabbitmq.listener.simple.retry.max-attempts=5 ## 重试的间隔时间 spring.rabbitmq.listener.simple.retry.initial-interval=5000



