消息队列MQ(一)——RabbitMQ的介绍、安装以及管理页面的使用
消息队列MQ(二)——Spring Boot整合RabbitMQ
消息队列MQ(三)——RabbitMQ高级特性与RabbitMQ集群搭建
文章目录
- 系列文章目录
- 前言
- 一、RabbitMQ高级特性
- 1、生产者确认
- 1.1、/confirm/i确认模式
- 1.2、return退回模式
- 1.3 小结
- 2、消费者确认
- 2.1 消费者确认demo(手动确认)
- 2.2 小结
- 3、消费端限流
- 3.1 消费端限流demo
- 3.2 小结
- 4、TTL(消息存活时间)
- 4.1 demo演示
- 4.2 小结
- 5、死信队列
- 5.1 死信队列demo
- 5.2 小结
- 6、延迟队列
- 二、RabbitMQ应用问题---消息幂等性处理
- 三、RabbitMQ集群搭建
- 1、集群方案原理
- 2、集群搭建
- 2.1、启动三个启动RabbitMQ
- 2.2、进入RabbitMQ容器m1,重置rabbitmq服务
- 2.3、进入RabbitMQ容器m2,加入集群:连接节点1rabbitmq服务
- 2.4、进入RabbitMQ容器m3,加入集群:连接节点1rabbitmq服务
- 2.5、启动完成查看集群状态
- 3、集群存在问题
前言
上一篇文章对RabbitMQ做了简单的应用,那我们有没有考虑过万一生产者发过去了,消费者没收到怎么办?有很多消费者但我又不想无限的创建消息队列怎么办?这一篇我们就谈一谈MQ在应用中可能产生的问题以及如何应对。这样,下次我们“做饭”的时候,不仅知道怎么做,还知道太咸了怎么补救一下。
提示:以下是本篇文章正文内容,下面案例可供参考
一、RabbitMQ高级特性TCP/IP有三次握手,四次挥手,为的就是知道消息的可达性和双方消息的均等性,那么到MQ这里,怎么解决呢?
准备好demo
1.编写MessageController用于向消息队列发送消息:
@RestController
public class MessageController {
//注入RabbitMQ模板对象
@Autowired
private RabbitTemplate rabbitTemplate;
//调用RabbitMQ模板发送消息方法,向消息队列发送消息内容
@RequestMapping("/direct/sendMsg")
public String sendMsg(String exchange,String routingkey,String
msg){
rabbitTemplate.convertAndSend(exchange,routingkey,msg);
return "已投递~";
}
}
2.生产者和消费者的yml配置:
# 消息队列服务地址 spring.rabbitmq.host=112.165.150.18 # 消息队列端口 spring.rabbitmq.port=5672 # 消息队列账户 spring.rabbitmq.username=guest # 消息队列账户密码 spring.rabbitmq.password=guest # 设置虚拟主机,不设置默认是根路径(/) spring.rabbitmq.virtual-host=/guest
3.编写Consumer服务用于接收消息队列的消息:
@Component
@RabbitListener(queues = "order.A")
public class ConsumerQueueListener {
//接收消息,监听器调用此方法执行业务逻辑
@RabbitHandler
public void queueListenerHandle(String msg){
System.out.println("下单消息{},内容:"+msg);
}
}
4.用于后面测试:
http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机
1、生产者确认
问题1:生产者能百分之百将消息发送给消息队列!
两种意外情况:
第一,消费者发送消息给MQ失败,消息丢失;
第二,交换机路由到队列失败,路由键写错;
为了确定我的消息发送成功,RabbitMQ为我们引进了两种方式来控制消息的投递可靠性:①/confirm/i确认模式;②return退回模式
- product发送消息到交换机(Exchange),无论成功与否,都会执行一个确认回调的方法/confirm/iCallback。
- exchange到消息队列(queue)投递失败则会执行一个返回方法returnCallback。
利用这两个 callback 控制消息的可靠性投递
1.1、/confirm/i确认模式在配置文件中开启生产者发布消息确认模式:
# 开启生产者确认模式:(/confirm/i),投递到交换机,不论失败或者成功都回调
spring:
rabbitmq:
publisher-confirms = true
编写生产者确认回调方法:
//发送消息回调确认类,实现回调接口/confirm/iCallback,重写其中confirm()方法
@Component
public class Message/confirm/iCallback implements
RabbitTemplate./confirm/iCallback {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack,
String cause) {
if (ack){
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
}
在RabbitTemplate中,设置消息发布确认回调方法
@Component
public class Message/confirm/iCallback implements
RabbitTemplate./confirm/iCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate(){
//设置消息确认回调方法
rabbitTemplate.set/confirm/iCallback(this::/confirm/i);
}
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack,
String cause) {
if (ack){
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
}
请求测试:
- 测试成功
http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机
- 测试失败
http://localhost:8080/direct/sendMsg?exchange=order_xxxxxxx&routingkey=order.A&msg=购买苹果手机1.2、return退回模式
消息回退模式特点:消息进入交换机,路由到队列过程中出现异常则执行回调方法
- 在配置文件中,开启生产者发布消息回退模式
# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调 spring.rabbitmq.publisher-returns=true
- 在Message/confirm/iCallback类中,实现接口RabbitTemplate.ReturnCallback
@Component
public class Rabbit/confirm/i implements RabbitTemplate./confirm/iCallback
,RabbitTemplate.ReturnCallback {
//重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("交换机路由至消息队列出错:>>>>>>>");
System.out.println("交换机:"+exchange);
System.out.println("路由键:"+routingKey);
System.out.println("错误状态码:"+replyCode);
System.out.println("错误原因:"+replyText);
System.out.println("发送消息内容:"+message.toString());
System.out.println("<<<<<<<<");
}
}
- 在RabbitTemplate中,设置消息发布回退回调方法
@PostConstruct
public void initRabbitTemplate(){
//设置消息确认回调方法
rabbitTemplate.set/confirm/iCallback(this::/confirm/i);
//设置消息回退回调方法
rabbitTemplate.setReturnCallback(this::returnedMessage);
}
- 请求测试失败执行returnedMessage方法:
http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=xxxxx&msg=购买苹果手机1.3 小结
确认模式:
- 设置publisher-confirms=“true” 开启 确认模式。
- 实现RabbitTemplate./confirm/iCallback接口,重写/confirm/i方法
- 特点:不论消息是否成功投递至交换机,都回调/confirm/i方法,只有在发送失败时需要写业务代码进行处理。
退回模式
- 设置publisher-returns=“true” 开启 退回模式。
- 实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法
- 特点:消息进入交换机后,只有当从exchange路由到queue失败,才去回调returnedMessage方 法;
问题2:消费者能百分百接收到请求,且业务执行过程中还不能出错!
ack指 Acknowledge,拥有确认的含义,是消费端收到消息的一种确认机制;
1.自动确认:acknowledge=“none”。
当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。
2.手动确认:acknowledge=“manual”。
手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() ,手动签收,如果出现异常,则调用 channel.basicNack() 方法,让其自动重新发送消息。
3.根据异常情况确认:acknowledge=“auto”,(这种方式使用麻烦,不作讲解)。
2.1 消费者确认demo(手动确认)目标:自定义消费者接收消息监听器,监听收到消息的内容,手动进行签收;当业务系统抛出异常则拒绝签收,重回队列
配置yml
# 消息队列服务地址 spring.rabbitmq.host=112.165.150.18 # 消息队列端口 spring.rabbitmq.port=5672 # 消息队列账户 spring.rabbitmq.username=guest # 消息队列账户密码 spring.rabbitmq.password=guest # 设置虚拟主机,不设置默认是根路径(/) spring.rabbitmq.virtual-host=/guest # 配置开启手动签收 # 简单模式的开启手动签收 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 路由模式开启手动签收 spring.rabbitmq.listener.direct.acknowledge-mode=manual # 是否支持重试 spring.rabbitmq.listener.direct.retry.enabled=true
消费者工程中,创建自定义监听器类CustomAckConsumerListener
@Component
@RabbitListener(queues = "order.A")
public class CustomAckConsumerListener {
@RabbitHandler
public void queueListenerHandle(String msg,Message message, Channel channel) throws Exception {
//获取消息内容
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
try {
if (msg.contains("苹果")){
throw new RuntimeException("不允许卖苹果手机!!!");
}
channel.basicAck(deliveryTag,false);
System.out.println("手动签收完成:{}");
} catch (Exception ex){
channel.basicNack(deliveryTag,false,true);
System.out.println("拒绝签收,重回队列:{}"+ex);
}
}
}
测试成功
测试发送消息手动签收,请求地址http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机
测试失败
拒绝签收消息,消息重回队列,请求地址包含苹果,抛出异常:http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机2.2 小结
- 如果想手动签收消息,那么需要自定义实现消息接收监听器,实现ChannelAwareMessageListener接口
- 设置AcknowledgeMode模式
1.none:自动
2.auto:异常模式
3.manual:手动 - 调用channel.basicAck方法签收消息
- 调用channel.basicNAck方法拒签消息
情形一:如果在A系统中需要维护相关的业务功能,可能需要将A系统的服务停止,那么这个时候消息的生产者还是一直会向MQ中发送待处理的消息,消费者此时服务已经关闭,导致大量的消息都会在MQ中累积。当A系统成功启动后,消费者会一次性将MQ中累积的大量的消息拉取到自己的服务,导致服务在短时间内会处理大量的业务,可能会导致系统服务的崩溃。 所以消费端限流是非常有必要的。
情形二:当大量用户注册时,高并发请求过来,邮件接口只支持小量并发,这时消费端限流也非常必要;
消费端限流配置:设置监听器容器属性container.setPrefetchCount(1);表示消费端每次从mq拉去1条消息来消费,直到手动确认消费完毕后,才会继续拉去下一条消息。
基于消费端确认的工程进行演示消费端限流要求, 消费端确认模式必须为手动确认
** 配置每次拉取消息1条**
# 消息队列服务地址 spring.rabbitmq.host=112.165.150.18 # 消息队列端口 spring.rabbitmq.port=5672 # 消息队列账户 spring.rabbitmq.username=guest # 消息队列账户密码 spring.rabbitmq.password=guest # 设置虚拟主机,不设置默认是根路径(/) spring.rabbitmq.virtual-host=/guest # 配置开启手动签收 # 简单模式的开启手动签收 spring.rabbitmq.listener.simple.acknowledge-mode=manual # 路由模式开启手动签收 spring.rabbitmq.listener.direct.acknowledge-mode=manual # 是否支持重试 spring.rabbitmq.listener.direct.retry.enabled=true # 设置消费端限流,每次拉取消息多少条,默认是250条 spring.rabbitmq.listener.direct.prefetch=1
在监听器接收消息方法中,休眠3秒,否则拉取过快,看不出效果
@Component
@RabbitListener(queues = "order.A")
public class CustomAckConsumerListener {
@RabbitHandler
public void queueListenerHandle(String msg,Message message, Channel channel) throws Exception {
//获取消息内容
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg);
//获取投递标签
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
try {
//休眠3秒
Thread.sleep(3000);
if (msg.contains("苹果")){
throw new RuntimeException("不允许卖苹果手机!!!");
}
channel.basicAck(deliveryTag,false);
System.out.println("手动签收完成:{}");
} catch (Exception ex){
channel.basicNack(deliveryTag,false,true);
System.out.println("拒绝签收,重回队列:{}"+ex);
}
}
}
重启消费者工程,查看控制面板频道中,每次拉取1消息配置是否生效
多次发送请求进行测试
测试http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机3.2 小结
- 设置每次拉取消息1条spring.rabbitmq.listener.simple.prefetch=2
- 注意,如果想进行消费端限流,那么消息必须手动确认,AcknowledgeMode为MANUAL
TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,会被自动清
除。RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。
给单个消息设置过期时间没实际意义
4.1 demo演示-
在RabbitMQ管理控制台中,新增消息队列 order.B ,并设置消息失效时间为5秒
-
在RabbitMQ管理控制台中,将消息队列 order.B 绑定到交换机 order_exchange 上
-
测试发送消息到消息队列order.B中,该队列没有消费者接收消息
-
等待5秒,消息自动消失
-
设置消息过期时间
- 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
- 由于队列是先进先出的,所以如果设置单个消息的过期时间并没有实际意义
例如:设置消息A的过期时间为10秒,消息B的过期时间为5秒,但是先将消息A发送至队列,那么只有等消息A被消费或者到期移除后才会将消息B消费或者到期移除。
消息丢失,发送失败如何处理?任由消息消失?
死信队列:当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是Dead Letter Exchange(死信交换机 简写:DLX)。
消息成为死信的三种情况:
- 队列消息长度到达限制;
- 消费者拒接消息(basicNack),并且不把消息重新放回源队列,requeue=false;
- 源队列存在消息过期设置,消息到达超时时间未被消费;
设置死信队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
在RabbitMQ管理控制台中,创建死信队列 deadQueue
在RabbitMQ管理控制台中,创建死信交换机 deadExchange
死信队列绑定死信交换机,路由键为 order.dead
消息队列order.B绑定死信交换机
向消息队列 order.B 中发送消息【消息队列order.B中的消息失效时间为5秒】
等待5秒,消息队列order.B中的消息进入死信队列
- 死信交换机和死信队列和普通的没有区别
- 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
- 消息成为死信的三种情况:
队列消息长度到达限制;
消费者拒接消费消息,并且不重回队列;
原队列存在消息过期设置,消息到达超时时间未被消费;
什么是延迟队列?即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
延迟队列应用场景
- 下单后,30分钟未支付,取消订单,回滚车票;2. 新用户注册成功7天后,发送短信问好。
实现方法: 1. 定时器 ;2. 延迟队列
注意:在RabbitMQ中并未提供延迟队列功能。但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
幂等性指同一操作发起的一次请求或者多次请求的结果是一致的。 就像我们去买棒棒糖,第一次买一个五毛,第二次去还是五毛一样,不会因为去买次数多而有不同的价格。 即其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指,消费多条相同的消息,得到与消费该消息一次相同的结果。
使用 乐观锁机制 保证消息的幂等操作原理:
总是假设最好的情况,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法实现。乐观锁适用于多读的应用类型,这样可以提高吞吐量。
第一次执行版本:version=1
-- 先查询再更新:id=1,money=4000,version=1 update account set money=money-500,version=version+1 where id=1 and version=1;
第二次执行:version=2
-- 第二次执行:version=2,相同的SQL语句无法生效 update account set money=money-500,version=version+1 where id=1 and version=1;
第n次执行:version=2,只有第一次执行有效果!因为version为1
update account set money=money-500,version=version+1 where id=1 and version=1;三、RabbitMQ集群搭建
一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。
1、集群方案原理RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的magic cookie来实现)。因此,RabbitMQ天然支持Clustering。集群是保证可靠性的一种方式,同时可以通过水平扩展以达到增加消息吞吐量能力的目的,这里只需要保证erlang_cookie的参数一致集群即可通信。
2、集群搭建主要参考官方文档:https://www.rabbitmq.com/clustering.html
# docker run 命令解释 docker run --link 可以用来链接2个容器,使得源容器(被链接的容器)和接收容器(主动去链接 的容器)之间可以互相通信。 # -p 映射一个端口 # -v 挂载数据卷 # --name为当前容器设置一个别名 # -di 启动守护式容器 # -it 启动交互式容器 # 进入容器之后执行的命令/bin/bash # -e 设置默认参数 # --hostname 设置当前容器中虚拟机的主机名称 # --link的格式: name:hostname # name是源容器的名称;hostname是源容器在的hostname。2.1、启动三个启动RabbitMQ
# 1.1 启动RabbitMQ1 docker run -d --hostname rabbitmq1 --name=m1 -p 15673:15672 -p 5673:5672 -e RABBITMQ_ERLANG_cookie='rabbitmqcookie' rabbitmq:management # -e 注入参数,RABBITMQ_ERLANG_cookie: erlang_cookie参数,集群中的节点必须保持一致 # 1.2 启动RabbitMQ2 docker run -d --hostname rabbitmq2 --name=m2 -p 15674:15672 -p 5674:5672 -- link m1:rabbitmq1 -e RABBITMQ_ERLANG_cookie='rabbitmqcookie' rabbitmq:management # 1.3 启动RabbitMQ3 docker run -d --hostname rabbitmq3 --name m3 -p 15675:15672 -p 5675:5672 -- link m2:rabbitmq2 --link m1:rabbitmq1 -e RABBITMQ_ERLANG_cookie='rabbitmqcookie' rabbitmq:management2.2、进入RabbitMQ容器m1,重置rabbitmq服务
- 停止rabbitmq服务
- 重置rabbitmq服务
- 启动rabbitmq服务
#进入myrabbiratmq1容器 docker exec -it m1 bash #停止rabbit应用 rabbitmqctl stop_app #重置rabbitmq rabbitmqctl reset #启动rabbit应用 rabbitmqctl start_app2.3、进入RabbitMQ容器m2,加入集群:连接节点1rabbitmq服务
- 停止rabbitmq服务
- 重置rabbitmq服务
- 加入集群:连接节点1rabbitmq服务
- 启动rabbitmq服务
#3.进入myrabbitmq2容器 docker exec -it m2 bash #停止rabbit应用 rabbitmqctl stop_app #重置rabbitmq rabbitmqctl reset #加入集群 rabbitmqctl join_cluster --ram rabbit@rabbitmq1 ## --ram 设置内存节点 #启动rabbit应用 rabbitmqctl start_app2.4、进入RabbitMQ容器m3,加入集群:连接节点1rabbitmq服务
- 停止rabbitmq服务
- 重置rabbitmq服务
- 加入集群:连接节点1rabbitmq服务
- 启动rabbitmq服务
#4.进入myrabbitmq3容器 docker exec -it m3 bash #停止rabbit应用 rabbitmqctl stop_app #重置rabbitmq rabbitmqctl reset #加入集群 硬盘节点 rabbitmqctl join_cluster rabbit@rabbitmq1 #启动rabbit应用 rabbitmqctl start_app2.5、启动完成查看集群状态
#查看集群状态 rabbitmqctl cluster_status3、集群存在问题
上述配置的RabbitMQ集群模式,并不能保证队列的高可用性,尽管交换机绑定队列的内容,可以复制到集群里的任何一个节点,但是队列内容不会复制,队列节点宕机直接导致该队列无法应用,只能等待节点重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,须要创建镜像队列。
镜像队列可以同步queue和message,当主queue挂掉,从queue中会有一个变为主queue来接替工作。
镜像队列是基于普通的集群模式的,所以你还是得先配置普通集群,然后才能设置镜像队列。镜像队列设置后,会分一个主节点和多个从节点,如果主节点宕机,从节点会有一个选为主节点,原先的主节点起来后会变为从节点。
#设置镜像队列命令,随便在一台节点都可以执行
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
然后将主节点停止后测试是否可以正常收发消息



