同步通讯:就像打电话,需要实时响应。
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的问题:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
异步通讯:就像发邮件,不需要马上回复。
好处:
- 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能
- publisher:生产者
- consumer:消费者
- exchange:交换机,负责消息路由
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
基本消息队列的消息发送流程
1.建立connection
2. 创建channel
3. 利用channel声明队列
4. 利用channel向队列发送消息
基本消息队列的消息接收流程
1.建立connection
2. 创建channel
3. 利用channel声明队列
4. 定义consumer的消费行为handleDelivery()
5. 利用channel将消费者与队列绑定
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
1.简单消息模型步骤1.引入依赖
org.springframework.boot spring-boot-starter-amqp
2.修改配置
spring: rabbitmq: host: 192.168.56.134 port: 5672 virtual-host: / username: itcast password: 123321
3.发送消息:
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!-miaoxiaowen";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
4.接收消息:
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}}
2.WorkQueue 工作任务模型
让多个消费者绑定到一个队列,共同消费队列中的消息
1.创建多个RabbitListener监听同一个队列
2.多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
3.消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。
4.通过设置prefetch来控制消费者预取的消息数量
同一条消息可以被多个消费者消费
Publisher:生产者将消息发给交换机
Exchange:交换机,接收生产者发送的消息,并且根据类型处理消息到队列中。Exchange(交换机)只负责转发消息,不具备存储消息的能力
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Queue:消息队列。
Consumer:消费者
消息发送流程是这样的:
1可以有多个队列
2每个队列都要绑定到Exchange(交换机)
3生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
4交换机把消息发送给绑定过的所有队列
5订阅队列的消费者都能拿到消息
实现
1.声明队列和交换机
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
2.发送消息
// 队列名称 String exchangeName = "itcast.fanout"; // 消息 String message = "hello, everyone!"; rabbitTemplate.convertAndSend(exchangeName, "", message);
3.接收消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
交换机的作用
接收publisher发送的消息
将消息按照规则路由到与之绑定的队列
不能缓存消息,路由失败,消息丢失
FanoutExchange的会将消息路由到每个绑定的队列
不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
实现:
1.声明交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
2.消息发送
String exchangeName = "itcast.direct"; // 消息 String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!"; // 发送消息 rabbitTemplate.convertAndSend(exchangeName, "red", message);Direct交换机与Fanout交换机的差异
Fanout交换机将消息路由给每一个与之绑定的队列
Direct交换机根据RoutingKey判断路由给哪个队列
如果多个队列具有相同的RoutingKey,则与Fanout功能类似
基于@RabbitListener注解声明队列和交换机有哪些常见注解?
@Queue
@Exchange
通配符规则:
#:匹配一个或多个词
*:匹配不多不少恰好1个词
实现:
1.声明绑定交换机
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
Direct交换机与Topic交换机的差异
Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割
Topic交换机与队列绑定时的bindingKey可以指定通配符
#:代表0个或多个词
*:代表1个词
引入依赖
com.fasterxml.jackson.dataformat jackson-dataformat-xml 2.9.10
注入bean
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
消息的可靠性
常见的丢失原因包括:
发送时丢失:
生产者发送的消息未送达exchange
消息到达exchange后未到达queue
MQ宕机,queue将消息丢失
consumer接收到消息后未消费就宕机
消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突。
返回结果有两种方式
publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
实现
1.修改配置
spring: rabbitmq: publisher-confirm-type: correlated #开启publisher-confirm 两种类型:simple:同步等待confirm结果,直到超时; correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback publisher-returns: true #开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback template: mandatory: true #定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
2.定义Return回调
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目加载时配置
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投递失败,记录日志
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有业务需要,可以重发消息
});
}
}
3.定义ConfirmCallback
ConfirmCallback可以在发送消息时指定,因为每个业务处理confirm成功或失败的逻辑不一定相同。
// 1.消息体
String message = "hello, spring amqp!";
// 2.全局唯一的消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if(result.isAck()){
// 3.1.ack,消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
}else{
// 3.2.nack,消息失败
log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
}
},
ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
);
// 4.发送消息
rabbitTemplate.convertAndSend("task.direct", "task", message, correlationData);
mq持久化
交换机持久化
通过代码指定交换机持久化:
@Bean
public DirectExchange simpleExchange(){
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, false);
}
队列持久化
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
消息持久化
设置消息的属性(MessageProperties),指定delivery-mode:1:非持久化;2:持久化
manual:手动ack,需要在业务代码结束后,调用api发送ack。
auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
- none模式下,消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可。
实现:
1.修改配置
spring: rabbitmq: listener: simple: acknowledge-mode: auto # 关闭ack失败重试机制
本地重试:利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列
1.修改配置:
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初识的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
2。开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试,重试达到最大次数后,Spring会返回ack,消息会被丢弃。
3.在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
@Configuration
public class ErrorMessageConfig {
1)在consumer服务中定义处理失败消息的交换机和队列
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}
2)定义一个RepublishMessageRecoverer,关联队列和交换机
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}
死信交换机
死信:当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费(消息所在的队列设置了超时时间,消息本身设置了超时时间)
- 要投递的队列消息满了,无法投递
死信交换机的使用场景
如果队列绑定了死信交换机,死信会投递到死信交换机;
可以利用死信交换机收集所有消费者处理失败的消息(死信),交由人工处理,进一步提高消息队列的可靠性。
死信队列:消息被消费者拒绝后进入死信交换机,然后进行死信队列,专门的服务取出死信进行处理
TTL(死信队列,延迟队列)场景:消费者延迟收到消息的效果
延迟发送短信
用户下单,如果用户在15 分钟内未支付,则自动取消
预约工作会议,20分钟后自动通知所有参会人员
消息超时的两种方式
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
如何实现发送一个消息20秒后消费者才收到消息?
- 给消息的目标队列(没有消费者)指定死信交换机
- 将消费者监听的队列绑定到死信交换机
- 发送消息时给消息设置超时时间为20秒
声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。发送消息时,一定要携带x-delay属性,指定延迟的时间:
声明一个交换机,添加delayed属性为true
发送消息时,添加x-delay头,值为超时时间
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。
惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
1.基于bean:.lazy()
2.基于@RabbitListener arguments
惰性队列的优点
基于磁盘存储,消息上限高
没有间歇性的page-out,性能比较稳定
惰性队列的缺点
基于磁盘存储,消息时效性会降低
性能受限于磁盘的IO
增加更多消费者,提高消费速度。也就是我们之前说的work queue模式
扩大队列容积,提高堆积上限
队列上绑定多个消费者,提高消费速度
使用惰性队列,可以再mq中保存更多消息
是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息。
当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
队列所在节点宕机,队列中的消息就会丢失
镜像集群:
是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
一个队列的主节点可能是另一个队列的镜像节点
所有操作都是主节点完成,然后同步给镜像节点
主宕机后,镜像节点会替代成新的主



