第三章 `RabbitMQ`高级特性
3.1-消息如何保障100%的投递成功方案3.2-幂等性概念及业界主流解决方案3.3-`/confirm/i`确认消息详解3.4-`Return`消息机制3.5-消费端自定义监听3.6-消费端的限流策略3.7-消费端`ACK`与重回队列机制3.8-`TTL`消息详解3.9-死信队列`DLX`
第三章 RabbitMQ高级特性 3.1-消息如何保障100%的投递成功方案生产端的可靠性投递
保障消息的成功发出保障MQ节点的成功接收发送端收到MQ节点(Broker)确认应答完善的消息进行补偿机制
BAT/TMD大厂解决方案
消息落库,对消息状态进行打标(缺点,在高并发场景下对数据库压力过大)
- 生产者将业务数据和消息入库,status默认为0生产者发送消息到MQ消费者进行消息确认正常消费情况下,update消息状态为1,表示正常消费,如果出现网络闪断等问题,则消息状态为0不变定时任务去数据库查询状态为0的消息针对此类数据,进行数据重发,重复2-3-4-5-6步骤统计重复次数3次以上的消息,不再进行重发,update消息状态为2,等待下一步处理
消息的延迟投递,做二次确认,异步回调检查
1. 生产者(上游服务)先将业务数据,比如订单数据入库,之后再发送消息到MQ中,而且不加事务,因为事务在这影响性能(绿色线) 2. 定时在5分钟后,发送延迟检查消息(红色线) 3. 消费者(下游服务)监听指定队列,消费消息(绿色线) 4. 如果消费者成功接收消息后,将主动发送一条确认消息到MQ(蓝色线) 5. 回调服务监听指定队列,该队列监听消费者发送的确认消息,如果成功收到确认消息,表示消息正常消费,则进行入库处理(黑色线) 6. 回调服务监听指定队列,该队列监听生产者发送的延迟检查消息,5分钟后接收到消息,回调服务在数据库中检查这条消息是否被正常消费,如果一切正常,则不返回消息(红色线) 7. 如果这条消息未正常消费,则回调服务调用RPC服务(rpc-远程服务调用),通知上游服务(生产者)进行重发消息(紫色线)3.2-幂等性概念及业界主流解决方案
引入
借鉴数据库乐观锁机制:执行一条更新库存SQL
//高并发情况下,同时两条数据更新语句,通过version来预防超卖 update T_goods set count = count -1 ,version = version +1 where version = 1
消费端实现幂等,就表示,即使消费端收到多条一样的消息,永远只会消费一次
消费端幂等性保障
场景:在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
- 消费端实现幂等,就表示,即使消费端收到多条一样的消息,永远只会消费一次
业务主流幂等性操作
唯一ID + 指纹码 机制,利用数据库主键去重
//指纹码,可以是时间戳,可以是区别码,id为主键唯一 select count(1) from t_order where id = 唯一id+指纹码 //查询这个主键是否有值,无则插入数据,有则说明消息已消费入库 //好处:实现简单 //缺点:高并发下有数据写入的性能瓶颈 //解决方案:根据ID进行分库分表进行算法路由,进行分压分流
利用Redis的原子性去实现
使用redis进行幂等, 利用redis set 之后,判断isexist()判断是否存在 另外需要考虑问题 1.我们是否要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存之间如何做到原子性?数据一致性 2.如果不进行落库,那么都存储到缓存中,如何设置定时同步的策略?
理解/confirm/i消息确认机制
消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给生产者一个应答
生产者接收应答,用来确认这条消息是否正常发送到Broker,这种方式也是RabbitMQ消息的可靠性投递的核心保障
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OXi7Wl6e-1645578206551)(img/confirm/i消息确认.png)]
实现/confirm/i确认消息
- 在channel上开启确认模式,channel.confirmSelect()在channel上添加监听,add/confirm/iListener,监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理
代码演示
// 生产者端
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("118.126.65.50");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel./confirm/iSelect();
String exchangeName = "test_/confirm/i_exchange";
String routingKey = "test_routing_key";
String exchangeType = "direct";
String queueName = "test_/confirm/i_queue";
String msg = "confirm message";
channel.exchangeDeclare(exchangeName,exchangeType,true,false,null);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routingKey);
channel.basicPublish(exchangeName,routingKey,null,msg.getBytes());
channel.add/confirm/iListener(new /confirm/iListener() {
// broker (消息代理,就是MQ) 当消息投递到了所匹配的队列之后,broker就会发送一个确认信号,给到生产者
@Override
public void handleAck(long l, boolean b) throws IOException {
System.out.println("======有ack=========");
}
@Override
public void handleNack(long l, boolean b) throws IOException {
// 没有ack情况,磁盘占满、队列满等
System.out.println("======没有ack=========");
}
});
}
// 消费者端
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("118.126.65.50");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_/confirm/i_exchange";
String routingKey = "test_routing_key";
String queueName = "test_/confirm/i_queue";
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息-" +new String(body));
}
};
channel.basicConsume(queueName,true,consumer);
}
机制理解
Return Listener用于处理一些不可路由的消息
生产者通过指定一个Exchange和RoutingKey,把消息送到某个队列中,然后消费者监听队列,进行消费处理
在某些情况下,如果生产者在发送消息时,当前的Exchange不存在或者指定的RoutingKey路由不到指定队列,这个时候要监听这种不可达消息,就要使用Return Listener
基础API配置
Mandatory:如果是true,则监听器会接收到路由不可达的消息,然后进行后续处理;如果是false,那么broker会自动删除该消息 (翻译:adj强制的,n代理人)
参数设置 public void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
channel.addReturnListener(new ReturnListener() {
//replyCode-响应码
//replyText-响应文本
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
System.out.println(new String(bytes));
}
});
//如果消息正常发出,或者设置成false,则return listener不会监听,只有设置成true时才会监听错误返回
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,true,properties,msg.getBytes());
replyCode:312
replyText:NO_ROUTE
exchange:exchange-01
routingKey:routingKey-02
body:hello RabbitMq return
//4-创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("msg-"+new String(body));
}
};
//5-消费数据-队列名称,设置是否自动签收,消费者
channel.basicConsume(QUEUE_NAME,true,consumer);
3.6-消费端的限流策略
场景引入
RabbitMQ服务器有1万条未处理的消息,此时打开消费端服务器,则会出现巨量消息瞬时全部推送,单个消费者端无法同时处理
解决方案
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数量的消息未被确认前(通过基于consumer或者channel设置Qos的值),不进行消费新的数据
消费端一定不要设置成自动签收,而是手动签收
public void basicQos(int prefetchSize, int prefetchCount, boolean global) //prefetchSize:单条消息大小限制,一般设置0表示不限制 //prefetchCount:一般是1,告诉RabbitMQ不要同时给一个消费者推送多余N个消息,如果N个消息返回了ack,则继续发送消息,如果没有则不要继续发送消息 //global:一般false。true表示应用到channel级别,false表示应用到consumer级别,
代码示例
// 生产端不变
// 消费者 要改变
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("msg-"+new String(body));
//public void basicAck(long deliveryTag, boolean multiple)
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicQos(0, 1, false);
//限流方法,一定要将autoAck设置成false,表示手动签收
channel.basicConsume(QUEUE_NAME,false,consumer);
消费端手工ACK和NACK(返回ack表示成功消费,nack表示没有正常消费)
消费端进行消费,如果超过count次数后,一直返回nack,则可能是由于业务逻辑异常导致,此时将消息添加到日志中,然后进行补偿发送如果由于服务器宕机等严重问题,那就需要手工进行ACK保障消费端消费成功
消费端重回队列
消费端重回队列,是为了没有处理成功的消息,把消息重新传递给Broker,将该条消息放入此队列尾端,重新发送
一般实际使用中,都会关闭重回队列,设置成false
//public void basicNack(long deliveryTag, boolean multiple, boolean requeue) // 消息标签,是否批量,是否重回队列 channel.basicNack(envelope.getDeliveryTag(), false, false);
代码演示
// 生产端代码
for (int i = 0; i < 5; i++) {
String msg = "hello RabbitMq " + i;
Map header = new HashMap<>();
header.put("num",i);
// 设置自定义参数
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(header)
.build();
// 发布消息
channel.basicPublish("exchange-01","routingKey-01",true,properties,msg.getBytes(StandardCharsets.UTF_8));
}
// 消费端代码
// 3.创建信道
Channel channel = connection.createChannel();
// 4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 进行模拟判断,如果自定义属性的header头中,num参数为0,则返回重回队列中,不进行消费
if((Integer) properties.getHeaders().get("num") == 0){
// 消息唯一标签,是否批量处理,是否返回重回队列
// 观察管控台和控制台,会发现一直打印第0个消息,因为一直在重回队列中不停的发送,一直没有消费,管控台上unacked一直有该条数据
channel.basicNack(envelope.getDeliveryTag(),false,true);
}else{
channel.basicAck(envelope.getDeliveryTag(),false);
}
System.out.println("消息-"+new String(body));
}
};
// 5. 消费数据,将第二个参数autoACK设置为false,表示不进行自动签收,而是手动签收
channel.basicConsume("queue-01",false,consumer);
- TTL(time to live)生存时间
RabbitMQ支持消息的过期时间,在消息发送时可以进行指定RabbitMQ支持队列中消息过期时间,从消息进入队列开始计算,只要超过了队列的超时时间配置,则消息自动清除
死信队列 (DLX dead-letter-exchange)
利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLXDLX也是一个正常的Exchange,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置了某个队列的属性当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的DLX死信队列上,进而被路由到另一个队列可以监听这个队列中消息进行相应的处理,这个特性可以弥补RabbitMQ3.0之前支持的immediate参数功能
消息变成死信的情况
消息被拒绝(basic.reject/basic.nack)并且requeue = false(上面重回队列的设置参数,false表示不再重回队列)TTL过期队列达到最大长度
死信队列设置
首先需要设置死信队列的Exchange和Queue,然后进行绑定
Exchange 比如 dlx.exchangeQueue 比如 dlx.queueRoutingKey #
然后正常声明交换机、队列、绑定,只不过我们需要在队列上加上一个参数即可
arguments.put("x-dead-letter-exchange","dlx.exchange")
代码演示
- 消费端,创建正常的交换机、队列、并绑定,再创建死信交换机、死信队列、绑定,同时正常队列要设置参数绑定启动消费端,通过管控台,查看是否正常创建队列和交换机关闭消费端,模拟死信队列消息超时的情况生产端,发布消息,但设置消息过期时间为10s启动生产端,会发现5条消息一直堆积在commonQueue中,当消息过期后,5条消息自动转移到dlx.queue中,模拟成功,代码如下:
// 消费端
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.创建信道
Channel channel = connection.createChannel();
// 4.创建交换机
String commonExchangeName = "commonExchange";
String commonExchangeType = "topic";
String commonQueueName = "commonQueue";
String commonRoutingKey = "common.#";
// 交换机,交换机类型,是否持久化,是否自动删除,自定义参数
channel.exchangeDeclare(commonExchangeName,commonExchangeType,true,false,null);
// 5.创建队列
// 队列,是否持久化,是否排他性,是否自动删除,自定义参数
HashMap arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","dlx.exchange");
channel.queueDeclare(commonQueueName,true,false,false,arguments);
// 6.交换机与队列绑定
channel.queueBind(commonQueueName,commonExchangeName,commonRoutingKey);
// 7.声明死信队列
String dlxExchangeName = "dlx.exchange";
String dlxExchangeType = "topic";
String dlxQueueName = "dlx.queue";
String dlxRoutingKey = "#";
channel.exchangeDeclare(dlxExchangeName,dlxExchangeType,true,false,null);
channel.queueDeclare(dlxQueueName,true,false,false,null);
channel.queueBind(dlxQueueName,dlxExchangeName,dlxRoutingKey);
// 4.创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消息-"+new String(body));
}
};
// 5. 消费数据 队列名,是否自动签收,消费者对象
channel.basicConsume(commonQueueName,false,consumer);
// 8.关闭连接
//channel.close();
//connection.close();
}
}
// 生产端
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.创建信道
Channel channel = connection.createChannel();
String commonExchangeName = "commonExchange";
String commonRoutingKey = "common.#";
// 4.发送数据 (String exchange, String routingKey, BasicProperties props, byte[] body)
for (int i = 0; i < 5; i++) {
String msg = "hello RabbitMq DLX" + i;
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(commonExchangeName,commonRoutingKey,true,properties,msg.getBytes(StandardCharsets.UTF_8));
}
// 5.关闭连接
//channel.close();
//connection.close();
}
}



