1. 简介 1. 消息队列版本:3.8.8
MQ(messagequeue),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
2. 功能 2.1 流量削峰 在高流量的情况下,消息队列可以作为一个缓冲的中间件,将高峰的流量消息存入消息队列中,直接返回结果,然后慢慢处理消息队列里面的数据。这样可以防止短时间内有大量的请求访问服务器,极大的缓解了服务器的压力,达到了流量削峰的效果。
2.1 耦合指的是当你实现某个功能的时候(例如A发消息给B,或者A有任务要调用B去执行),直接调用需要的接口。而消息队列则是将相应的消息或任务存入消息队列中,即使其中一个接口出现问题,也不会影响另一方的功能使用。
2.3 异步处理不同于传统的同步处理,不需要等消息处理完才返回处理结果,而是先返回处理结果,再由消息对应的处理接口从队列中拉取消息进行处理(消费)。
3. MQ分类 3.1 ActiveMQ- 优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较 低的概率丢失数据
- 缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件, 以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥 着举足轻重的作用。目前已经被 linkedIn,Uber, Twitter, Netflix 等大公司所采纳。
- 优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ
- 缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在MQ 核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
2007 年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最 主流的消息中间件之一。
- 优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易 用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、Actionscript、XMPP、STOMP 等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
- 缺点:商业版需要收费,学习成本较高
主要特点是基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能, 肯定是首选 kafka 了。
4.2 RocketMQ天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削 峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务 场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
4.3 RabbitMQ结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分 方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。
5. RabbitMQ简介RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包 裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据
5.1 四大核心概念 1. 生产者产生数据发送消息的程序是生产者
2. 交换机交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
3. 队列队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
4. 消费者消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
5.2 RabbitMQ核心部分六大模式
工作原理图
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。
Channel:是在 connection 内部建立的逻辑连接,如果应用程 序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据
2. 安装RabbitMQ是采用 Erlang语言开发的,所以系统环境必须提供 Erlang环境,需要是安装 Erlang,所以需要先安装Erlang
- erlang官网:https://www.erlang.org/
- rabbit官网:https://www.rabbitmq.com/download.html
注:Erlang和RabbitMQ不同版本可能是不兼容的,需要对应版本才能使用。版本对照:https://www.rabbitmq.com/which-erlang.html
这里提供对应的版本,直接安装就行
链接:https://pan.baidu.com/s/10Y6Bc72m3SRlYexIguAK8Q
提取码:yyds
安装步骤
1. rpm -ivh erlang-21.3-1.el7.x86_64.rpm 2. yum install socat -y 3. rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
启动服务
/sbin/service rabbitmq-server start
安装web插件
- 停止服务
/sbin/service rabbitmq-server stop
- 开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
- 访问:ip+15276
-
需要权限账户,因此需要创建一个用户
# 创建账号 rabbitmqctl add_user admin 123 # 设置用户角色 rabbitmqctl set_user_tags admin administrator # 设置用户权限: set_permissions [-p
] (用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限当前用户和角色) rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" # 当前用户和角色 rabbitmqctl list_users
端口介绍
- 4369 – erlang发现口
- 5672 – client端通信口
- 15672 – 管理界面ui端口
- 25672 – server间内部通信口
简单模式只有一个队列,一个消费者,一个生产者。
pom.xml
RabbitMQ_Test org.example 1.0-SNAPSHOT 4.0.0 Demo01 8 8 com.rabbitmq amqp-client 5.8.0 commons-io commons-io 2.6 org.apache.maven.plugins maven-compiler-plugin 8 8
生产者
package com.zhan.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String QUEUE_NAME = "mq";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的ip");
//factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
System.out.println("生产者连接成功!");
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false,false, false,null);
String message = "hello RabbitMQ";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完毕!");
}
}
消费者
package com.zhan.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String QUEUE_NAME = "mq";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的ip");
//factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("消费者连接成功!");
System.out.println("等待消费消息……");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("接收到消息:" + new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag ->{
System.out.println("消费者消费失败");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback , cancelCallback);
}
}
3.2 工作模式:Work Queue
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
工作模式是一个队列存在多个消费者,此时任务会分发给消费者处理,消费者们并行处理任务。
3.11. 消息应答 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收 到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。
1. 自动应答: 消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失 了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当 然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使 得内存耗尽,最终这些消费者线程被操作系统杀死,所以**这种模式仅适用在消费者可以高效并以 某种速率能够处理这些消息的情况下使用。**
2. 手动应答 默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。
1. 应答方法// RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了 1. Channel.basicAck(tag, multiple); // 当处理消息时异常中断, 可以选择让消息重回队列重新发送。nack 操作可以是消息重回队列, 可以使用 basicNack() 方法: 2. Channel.basicNack(tag, multiple, requeue) // 用于否定确认 3. Channel.basicReject(tag, requeue)// 用于否定确认 // 与 Channel.basicNack 相比少一个Multiple参数。不处理该消息了直接拒绝,可以将其丢弃了2. Multiple参数:是否批量应答
-
true 代表批量应答 channel 上未应答的消息
比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
-
false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
总结: 开启手动确认之后,就算消费者在执行任务期间宕机,任务消息也不会丢失,在没有获得确认回执之前队列是不会删除消息的,但是开启手动回执切记消费者在处理消息之后要确认回执,防止已经处理的消息没有被删除,依然占据在队列当中,消耗大量内存。
# 查询工作队列中未确认消息的数量 rabbitmqctl list_queues name messages_ready messages_unacknowledged3.12 合理分发消息策略:
rabbitMq将消息一次性均匀发送给消费者,不管消费者处理性能如何,都均匀发放。但是我们可以使用 channel.basicQos(i) 方法, 这告诉rabbitmq一次只向消费者发送 i 条消息, 在返回确认回执前, 不要向消费者发送新消息. 而是把消息发给下一个空闲的消费者
3.13 消息重新入队 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队(入队到队尾)。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔宕机,也可以确 保不会丢失任何消息。
3.3 发布订阅模式 3.41. 交换机(Exchange) 概念:生产者生产的消息从不会直接发送到队列,而是直接发给交换机。交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就得由交换机的类型来决定。
类型- 直接(direct)
- 主题(topic)
- 标题(headers)
- 扇出(fanout)
前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的 原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
3.42 临时队列-
特点:断开了消费者的连 接,队列将被自动删除。
-
创建方式:
String queueName = channel.queueDeclare().getQueue();
绑定指的是交换机和队列连接的过程,它它告诉我们 exchange 和那个队 列进行了绑定关系,比如下图:一个交换机 X 绑定 Q1、Q2两个队列
3. 44 fanout exchange 介绍:- 它是将接收到的所有消息广播到它知道的所有队列中
下列介绍一个例子,我们让一个消费者将消息打印到控制台,另一个消费者将消息持久化到日志文件中
生产者
package com.zhan.demo02;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Producer {
//定义交换机名字
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
//使用fanout交换机,设置交换机名
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
System.out.println("请输入信息:");
while (scanner.hasNextLine()){
String message = scanner.nextLine();
//队列名为空,即随机队列
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8) );
}
}
}
消费者1号
package com.zhan.demo02;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connections = RabbitMqUtils.getConnection();
Channel channel = connections.createChannel();
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME, "");
System.out.println("等待接收消息:");
DeliverCallback deliverCallback = (consumerTag, message)->{
//打印消息
System.out.println( "打印到控制台上:"+new String(message.getBody(), StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("取消消费:" + consumerTag);
};
channel.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
消费者2号
package com.zhan.demo02;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.zhan.utils.RabbitMqUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME,"");
System.out.println("等待接收消息:");
DeliverCallback deliverCallback = (consumerTag, message)->{
File file = new File("a.log");
FileOutputStream fileOutputStream = new FileOutputStream(file,true);
fileOutputStream.write(message.getBody());
fileOutputStream.write("rn".getBytes(StandardCharsets.UTF_8));
};
CancelCallback cancelCallback = consumerTag ->{
};
channel.basicConsume(queue, true, deliverCallback, cancelCallback);
}
}
3.45 Direct exchange
Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。
如下图,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green. 在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black、green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
多重绑定: 当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多
测试代码
生产者
package com.zhan.demo03;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Producer {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
// 设置交换机类型,名字
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
HashMap map = new HashMap<>();
map.put("info", "普通info信息");
map.put("warn", "warn信息");
map.put("error", "error信息");
map.put("debug", "debug信息");
for (Map.Entry entry : map.entrySet()) {
String bindingKey = entry.getKey();
String message = entry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息成功:"+ "{"+ bindingKey +":" + message + "}");
}
}
}
消费者1号
package com.zhan.demo03;
import com.rabbitmq.client.*;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
channel.queueDeclare(queueName, false, false, false,null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warn");
System.out.println("等待接收消息……");
DeliverCallback deliverCallback = (ConsumerTag, delivery)->{
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println((" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费失败!" + consumerTag);
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
消费者2号
package com.zhan.demo02;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.zhan.utils.RabbitMqUtils;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
String queueName = "log";
channel.queueDeclare(queueName, false, false, false,null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("等待接收消息……");
DeliverCallback deliverCallback = (consumerTag, message)->{
File file = new File("a.log");
FileOutputStream fileOutputStream = new FileOutputStream(file,true);
fileOutputStream.write(message.getBody());
fileOutputStream.write("rn".getBytes(StandardCharsets.UTF_8));
System.out.println("消息写入文件成功!");
};
CancelCallback cancelCallback = consumerTag ->{
System.out.println("消费失败!" + consumerTag);
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
- **注意:需要先启动消费者**通过 routing_key绑定将交换机和队列绑定,否则生产者的将消息发送给交换机,交换机因为无相应的队列绑定,而丢弃消息。后续启动的消费者将无法接收到前面被丢弃的信息
- 从这个例子看可以看出,direct 交换机可以实现选择性地接收消息
- 局限性:
- 比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型
Topic交换机依赖于 routing_key的格式来区分队列,因此对 routing_key的格式具有一定的要求:
-
它必须是一个单词列表,单词可以任意,但是必须以点号分隔开。比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”(这个单词列表最多不能超过 255 个字节)
-
替换符:
-
*(星号):可以代替一个单词
-
#(井号):可以替代零个或多个单词(# 作为routingKey即匹配所有)
-
值得注意的是:
- 当一个队列绑定键是 # ,那么这个队列将接收所有数据,就有点像 fanout 了
- 如果队列绑定键当中没有 # 和 * 出现,那么该队列绑定类型就是 direct 了
测试代码
生产者
package com.zhan.topicDemo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Producer {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
HashMap map = new HashMap<>();
map.put("info.base", "info.base信息");
map.put("info.advantage", "advantage信息");
map.put("balabala.debug", "error.warn信息");
map.put("warn", "error.warn信息");
for (Map.Entry entry : map.entrySet()) {
String routingkey = entry.getKey();
String message = entry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingkey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息成功:"+ "{"+ routingkey +":" + message + "}");
}
}
}
消费者1号
package com.zhan.topicDemo;
import com.rabbitmq.client.*;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Consumer01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "info";
channel.queueDeclare(queueName, false ,false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME,"info.*");
System.out.println("等待接收消息……");
DeliverCallback deliverCallback = (ConsumerTag, delivery)->{
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println((" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费失败!" + consumerTag);
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
消费者2号
package com.zhan.topicDemo;
import com.rabbitmq.client.*;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Consumer02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "all";
channel.queueDeclare(queueName, false ,false, false, null);
//匹配所有
channel.queueBind(queueName, EXCHANGE_NAME,"#");
System.out.println("等待接收消息……");
DeliverCallback deliverCallback = (ConsumerTag, delivery)->{
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println((" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费失败!" + consumerTag);
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
消费者3号
package com.zhan.topicDemo;
import com.rabbitmq.client.*;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Consumer03 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "debug";
channel.queueDeclare(queueName, false ,false, false, null);
//匹配所有
channel.queueBind(queueName, EXCHANGE_NAME,"*.debug");
System.out.println("等待接收消息……");
DeliverCallback deliverCallback = (ConsumerTag, delivery)->{
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println((" 接收绑定键 :" + delivery.getEnvelope().getRoutingKey() + ", 消息:" + message));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消费失败!" + consumerTag);
};
channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
}
}
上面的代码运行后的结果可以看出,
- Consumer01只接收到了routimgKey是以info为开头的消息
- Consumer02接收到全部信息
- Consumer03接收到routimgKey是以以debug为结尾的消息
默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化。
4.1 队列持久化[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OvcU4b8m-1632496311640)(C:Usersmrwu-AppDataRoamingTyporatypora-user-imagesimage-20210911133821271.png)]
队列持久化需要在创建队列的时候就设定,不能修改,否则会报错。
生产者代码
package com.zhan.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Producer {
private static final String QUEUE_NAME = "mq";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的ip");
//factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
System.out.println("生产者连接成功!");
Channel channel = connection.createChannel();
boolean durable = true;//开启队列持久化
channel.queueDeclare(QUEUE_NAME, durable,false, false,null);
String message = "hello RabbitMQ";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送完毕!");
}
}
消费者代码
package com.zhan.mq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
private static final String QUEUE_NAME = "mq";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("你的ip");
//factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("消费者连接成功!");
System.out.println("等待消费消息……");
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("接收到消息:" + new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag ->{
System.out.println("消费者消费失败");
};
//boolean ack = false;
channel.basicConsume(QUEUE_NAME, true, deliverCallback , cancelCallback);
}
}
4.2 消息持久化可以在Web端看到持久化的队列
修改生产者生产消息方法的参数:
// MessageProperties.PERSISTENT_TEXT_PLAIN :持久化
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
5. 死信队列
死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有 后续的处理,就变成了死信,有死信自然就有了死信列。
- 应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
1. 消息TTL过期 生产者死信来源三种情况测试
package com.zhan.deadQueueDemo;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Producer {
private static final String NORNAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(NORNAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 交换机参数
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(NORNAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8));
System.out.println("已发送消息:" + message);
}
}
}
消费者1号
package com.zhan.deadQueueDemo;
import com.rabbitmq.client.*;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Consumer01 {
private static final String NORNAL_EXCHANGE = "normal_exchange";
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(NORNAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String dead_queue = "dead_queue";
channel.queueDeclare(dead_queue, false, false, false, null);
channel.queueBind(dead_queue, DEAD_EXCHANGE, "lisi");
HashMap map = new HashMap<>();
//正常队列绑定死信队列信息
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//正常队列设置死信 routing-key 参数 key 是固定值
map.put("x-dead-letter-routing-key", "lisi");
//设置消息过期时间10s
map.put("x-message-ttl",10000);
String normal_queue = "normal_queue";
channel.queueDeclare(normal_queue, false, false, false, map);
channel.queueBind(normal_queue, NORNAL_EXCHANGE, "zhangsan");
System.out.println("等待接受消息……");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normal_queue, true, deliverCallback, consumerTag -> { });
}
}
消费者2号
package com.zhan.deadQueueDemo;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Consumer02 {
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String dead_queue = "dead_queue";
channel.queueDeclare(dead_queue, false, false, false, null);
channel.queueBind(dead_queue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接受消息……");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息"+message);
};
channel.basicConsume(dead_queue, true, deliverCallback, consumerTag -> { });
}
}
- 先启动consumer01,然后立即断开(这一步是为了模拟正常队列接受不到信息,消息过期进入死信队列)
- 接着启动Producer,可以看到过了10s后消息从正常队列进入到死信队列
- 启动Consumer02,消费死信队列消息
消息生产者代码去掉 TTL 属性
package com.zhan.deadQueueDemo;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Producer {
private static final String NORNAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(NORNAL_EXCHANGE, BuiltinExchangeType.DIRECT);
for (int i = 0; i < 10; i++) {
String message = "info" + i;
channel.basicPublish(NORNAL_EXCHANGE, "zhangsan", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("已发送消息:" + message);
}
}
}
消费者1号
package com.zhan.deadQueueDemo;
import com.rabbitmq.client.*;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Consumer01 {
private static final String NORNAL_EXCHANGE = "normal_exchange";
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(NORNAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String dead_queue = "dead_queue";
channel.queueDeclare(dead_queue, false, false, false, null);
channel.queueBind(dead_queue, DEAD_EXCHANGE, "lisi");
HashMap map = new HashMap<>();
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key", "lisi");
//设置队列最大长度
map.put("x-max-length", 6);
String normal_queue = "normal_queue";
channel.queueDeclare(normal_queue, false, false, false, map);
channel.queueBind(normal_queue, NORNAL_EXCHANGE, "zhangsan");
System.out.println("等待接受消息……");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer01 接收到消息"+message);
};
channel.basicConsume(normal_queue, true, deliverCallback, consumerTag -> { });
}
}
消费者2号
代码不变……
运行结果
运行步骤跟上面一样,运行结果是,正常队列消息存储6条后队满,剩下的消息进入死信队列,给死信队列消费者消费。
3. 消息被拒 生产者代码不变
消费者1号package com.zhan.deadQueueDemo;
import com.rabbitmq.client.*;
import com.zhan.utils.RabbitMqUtils;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("all")
public class Consumer01 {
private static final String NORNAL_EXCHANGE = "normal_exchange";
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitMqUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(NORNAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String dead_queue = "dead_queue";
channel.queueDeclare(dead_queue, false, false, false, null);
channel.queueBind(dead_queue, DEAD_EXCHANGE, "lisi");
HashMap map = new HashMap<>();
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key", "lisi");
//map.put("x-message-ttl",10000);
map.put("x-max-length", 6);
String normal_queue = "normal_queue";
channel.queueDeclare(normal_queue, false, false, false, map);
channel.queueBind(normal_queue, NORNAL_EXCHANGE, "zhangsan");
System.out.println("等待接受消息……");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
if("info5".equals(message)){
System.out.println("Consumer01 接收到消息"+message+", 但是拒收!");
//requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
}else {
System.out.println("Consumer01 接收到消息"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(normal_queue, false, deliverCallback, consumerTag -> { });
}
}
消费者2号
代码不变
测试结果
运行步骤跟上面一样,运行结果是,正常队列消息正常被消费,只有info5时被拒收,消息进入死信队列,给死信队列消费者消费。
6. 延迟队列延时队列:队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
使用场景- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果 数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支 付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,定时任务的间隔轮询机制不适合,如:“订单十 分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万 级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单 的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
6.1 RabbitMQ 中的 TTLTTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这 条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
消息设置TTLrabbitTemplate.convertAndSend("X", "XB", "消息"+message,correlationData->{
String ttlTime = "10000";
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
队列设置TTL
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
两者区别
- 队列ttl属性:一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队 列中)
- 消息ttl属性:消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者 之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
另外,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以 直接投递该消息到消费者,否则该消息将会被丢弃。
6.2 ttl+死信队列实现延迟队列使用springboot整合消息队列
pom配置properties文件4.0.0 com.example rabbitmq 0.0.1-SNAPSHOT rabbitmq Demo project for Spring Boot 1.8 UTF-8 UTF-8 2.3.7.RELEASE io.springfox springfox-swagger2 2.9.2 io.springfox springfox-swagger-ui 2.9.2 org.springframework.boot spring-boot-starter-amqp com.alibaba fastjson 1.2.76 org.springframework.boot spring-boot-starter-web org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.junit.vintage junit-vintage-engine org.springframework.boot spring-boot-dependencies ${spring-boot.version} pom import org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 1.8 UTF-8 org.springframework.boot spring-boot-maven-plugin 2.3.7.RELEASE com.zhan.rabbitmq.RabbitmqApplication repackage repackage
# 应用名称 spring.application.name=rabbitmq # 应用服务 WEB 访问端口 server.port=8080 # rabbitmq配置 spring.rabbitmq.host=rabbitmq所在的云服务器地址 spring.rabbitmq.port=5672 spring.rabbitmq.password=admin spring.rabbitmq.username=adminswagger配置类·
package com.zhan.rabbitmq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.documentationType;
import springfox.documentation.spring.web.plugins.ApiSelectorBuilder;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public ApiSelectorBuilder webApiConfig(){
return new Docket(documentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select();
}
private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("salvation", "www.salvation.com","salvation@qq.com"))
.build();
}
}
架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:
队列,交换机绑定配置package com.zhan.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
//交换机
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
//死信交换机
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map map = new HashMap<>(3);
map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
map.put("x-dead-letter-routing-key", "YD");
map.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//声明队列 B ttl 为 40s 并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB(){
Map args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由 key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的 TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列 B 绑定 X 交换机
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
//声明死信队列 QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列 QD 绑定关系
@Bean
public Binding deadLetterBindingQD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生产者
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RequestMapping("ttl")
@RestController
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("sendMassge/{message}")
public void sendMessage(@PathVariable("message") String message){
log.info("当前时间:{},发送一条信息给两个 TTL 队列:{}", new Date(), message);
rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的TTL队列:"+message);
rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的TTL队列:"+message);
}
}
死信队列消费者
package com.zhan.rabbitmq.proAndCon;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Date;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg);
}
}
访问:http://localhost:8080/ttl/sendMassge/salvation
运行结果
6.3 延时队列优化上面架构有个缺点就是,一个队列只能中的消息过期时间都是相同的,如果我们想有多个过期消息,就必须使用多个队列,这样并不方便。可以通过定义一个普通队列设置消息的过期时间来个性化定制消息过期时间。
缺点
如果使用在消息属性上设置 TTL 的方式,消 息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
6.4 Rabbitmq 插件实现延迟队列 1. 下载rabbitmq_delayed_message_exchange插件2. 安装插件下载地址: https://www.rabbitmq.com/community-plugins.html
-
上传到云服务器
-
解压到rabbitmq安装目录下的plugins目录下
- plugins目录:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
-
安装插件
- rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-
重启rabbitmq
- /sbin/service rabbitmq-server stop
- /sbin/service rabbitmq-server start
-
查看web界面多了一个交换机选项
一个延迟队列,一个自定义队列,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中
2. 队列、交换机绑定配置package com.zhan.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayQueueConfig {
public static final String DELAY_QUEUE_NAME = "delay_queue";
public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
public static final String DELAY_ROUTING_KEY = "delay";
@Bean(name = "delayedQueue")
public Queue delayQueue(){
return new Queue(DELAY_QUEUE_NAME);
}
@Bean(name = "delayedExchange")
public CustomExchange delayExchange(){
Map param = new HashMap<>(1);
param.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, param);
}
@Bean
public Binding bindingDelayQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY).noargs();
}
}
3. 生产者
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
@Slf4j
@RestController
public class DelayedQueueProducer {
public static final String DELAY_EXCHANGE_NAME = "delay_exchange";
public static final String DELAY_ROUTING_KEY = "delay";
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/delayed/{message}/{delayedTime}")
public void sendMsg(@PathVariable("message")String message, @PathVariable("delayedTime") Integer delayedTime){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_ROUTING_KEY, message, correlationData->{
correlationData.getMessageProperties().setDelay(delayedTime);
return correlationData;
});
log.info("当前时间:{},发送一条延迟{}毫秒的消息给delayed.queue:{}",new Date(), delayedTime, message);
}
}
4. 消费者
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Slf4j
@Component
public class DelayedQueueConsumer {
public static final String DELAY_QUEUE_NAME = "delay_queue";
@RabbitListener(queues = DELAY_QUEUE_NAME)
public void receiveDelayedQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);
}
}
发送消息
依次访问:
- http://localhost:8080/delayed/法外狂人王五/40000
- http://localhost:8080/delayed/法外狂人李六/10000
运行结果
可以看出延迟时间段短的第二个消息被先消费掉了
6.5 总结 延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:**消息可靠发送、消息可靠投递、死信队列**来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
7. 发布确认高级 在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败, 导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢? 特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢:
7.1 springboot实现发布确认 1. 确认模式设置spring.rabbitmq.publisher-/confirm/i-type=correlated参数
- NONE:禁用发布确认模式,是默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:经测试有两种效果
- 其一:CORRELATED 值一样会触发回调方法
- 其二:在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法 等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker
package com.zhan.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.BindingResult;
@Configuration
public class Publish/confirm/iConfig {
public static final String EXCHANGE_NAME = "publish_/confirm/i";
public static final String QUEUE_NAME = "queue";
public static final String ROUTING_KEY = "key";
@Bean("/confirm/iQueue")
public Queue /confirm/iQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
@Bean("directExchange")
public DirectExchange directExchange(){
return new DirectExchange(EXCHANGE_NAME);
}
@Bean
public Binding queueBinding(@Qualifier("/confirm/iQueue")Queue queue, @Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(queue).to(directExchange).with(ROUTING_KEY);
}
}
3. 生产者
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
@Slf4j
@RestController
@RequestMapping("/confirm/i")
public class /confirm/iProducer {
public static final String EXCHANGE_NAME = "publish_/confirm/i";
public static final String ROUTING_KEY = "key";
public static final String ROUTING_KEY_2 = "key2";//没有连接队列的路由键
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack callBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init(){
rabbitTemplate.set/confirm/iCallback(callBack);
}
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message){
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message+":"+ROUTING_KEY,new CorrelationData("1"));
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_2, message+":"+ROUTING_KEY_2,new CorrelationData("2"));
log.info("发送消息:{}", message);
}
}
4. 回调接口
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate./confirm/iCallback {
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String id = "";
if(correlationData != null){
id = correlationData.getId();
}
if(ack){
log.info("交换机收到id为:{}的消息", id);
}else {
log.info("交换机未收到id为:{}的消息,原因是:{}", id, cause);
}
}
}
5. 消费者
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class /confirm/iConsumer {
public static final String QUEUE_NAME = "queue";
@RabbitListener(queues = QUEUE_NAME)
public void getMessage(Message message){
String msg = new String(message.getBody());
log.info("接收到队列"+QUEUE_NAME+"的消息:{}", msg);
}
}
6. 测试
访问:http://localhost:8080//confirm/i/sendMessage/salvation
结论
可以看到,发送了两条消息,第一条消息的 RoutingKey 为 “key”,第二条消息的 RoutingKey 为 “key2”,两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。
7.2 回退消息 1. mandatory 参数在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如 果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。那么如何让无法被路由的消息能够被发觉并处理?
通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。
2.代码测试 修改生产者代码package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
@Slf4j
@RestController
@RequestMapping("/confirm/i")
public class /confirm/iProducer implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {
public static final String EXCHANGE_NAME = "publish_/confirm/i";
public static final String ROUTING_KEY = "key";
public static final String ROUTING_KEY_2 = "key2";//没有连接队列的路由键
@Autowired
private RabbitTemplate rabbitTemplate;
// @Autowired
// private MyCallBack callBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init(){
//使用本类作为确认回调接口实现类
rabbitTemplate.set/confirm/iCallback(this);
rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
rabbitTemplate.setReturnCallback(this);
}
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message){
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message+":"+ROUTING_KEY,new CorrelationData("1"));
log.info("发送消息:{}, 路由键:{}", message, ROUTING_KEY);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_2, message+":"+ROUTING_KEY_2,new CorrelationData("2"));
log.info("发送消息:{}, 路由键:{}", message, ROUTING_KEY_2);
}
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData == null ? "" : correlationData.getId();
if(ack){
log.info("交换机消息确认成功!id:{}", id);
}else {
log.error("消息未成功投递到交换机,id:{},退回原因:{}",id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息{}被服务器回退,退回原因:{},退回编码:{},交换机:{}, 路由键:{}", new String(message.getBody()), replyText, replyCode, exchange, routingKey);
}
}
结果
-
能够发送到交换机的信息都会被确认
-
不能投递到队列的消息将会回退给设置的回调接口实现类(这里是生产者)处理回退信息
-
重点代码
//依赖注入 rabbitTemplate 之后再设置它的回调对象 @PostConstruct public void init(){ //使用本类作为确认回调接口实现类 rabbitTemplate.set/confirm/iCallback(this); rabbitTemplate.setMandatory(true); //设置回退消息交给谁处理 rabbitTemplate.setReturnCallback(this); }
当我们可以通过Mandatory设置消息回退之后,其实有时候我们往往无法处理这些消息,最多进行日志打印,但是这样就显得消息的处理不够彻底,不够优雅,当生产者所在服务有多台机器的时候,手动复制日志十分麻烦,而且mandatory会增加生产者的复杂性。回想前面学过的知识,我我们知道可以设置一格死信队列存储这些回退消息,但是**遗憾的是这些不可路由的消息,根本无法通过路由键进入死信队列**,所以死信队列并不适合处理回退信息。
在RabbitMq中存在一种备份交换机的机制,其实就是类似备胎的接盘侠,的那个交换机接收到不可路由的消息之后就将回退的消息转发给备份交换机。这个交换机通常是fanout类型(将消息广播到所有队列),我们只需要对这个备份交换机绑定一个队列,这样那些无法路由的消息就能够进入此队列被处理。
1. 代码架构 2. 代码 队列交换机绑定package com.zhan.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BackupConfig {
private static final String /confirm/i_EXCHANGE = "/confirm/i_exchange";
private static final String BACKUP_EXCHANGE = "backup_exchange";//备份交换机
private static final String /confirm/i_QUEUE = "/confirm/i_queue";
private static final String BACKUP_QUEUE = "backup_queue";
private static final String WARNING_QUEUE = "warning_queue";
@Bean("/confirm/iQueue")
public Queue /confirm/iQueue(){
return QueueBuilder.durable(/confirm/i_QUEUE).build();
}
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE).build();
}
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE).build();
}
@Bean("directExchange")
public DirectExchange directExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(/confirm/i_EXCHANGE)
//设置交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE)
.durable(true);
return (DirectExchange) exchangeBuilder.build();
}
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange(BACKUP_EXCHANGE);
}
@Bean
public Binding binding/confirm/iQueue(@Qualifier("/confirm/iQueue")Queue /confirm/iQueue, @Qualifier("directExchange") DirectExchange directExchange){
return BindingBuilder.bind(/confirm/iQueue).to(directExchange).with("key");
}
@Bean
public Binding bindingBackupQueue(@Qualifier("backupQueue")Queue backupQueue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(backupQueue).to(fanoutExchange);
}
@Bean
public Binding bindingWarningQueue(@Qualifier("warningQueue")Queue warningQueue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange){
return BindingBuilder.bind(warningQueue).to(fanoutExchange);
}
}
重点代码:配置备份交换机
@Bean("directExchange")
public DirectExchange directExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(/confirm/i_EXCHANGE)
//设置交换机的备份交换机
.withArgument("alternate-exchange", BACKUP_EXCHANGE)
.durable(true);
return (DirectExchange) exchangeBuilder.build();
}
生产者
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.PostConstruct;
@Slf4j
@RestController
@RequestMapping("/confirm/i")
public class /confirm/iProducer implements RabbitTemplate./confirm/iCallback, RabbitTemplate.ReturnCallback {
public static final String EXCHANGE_NAME = "/confirm/i_exchange";
public static final String ROUTING_KEY = "key";
public static final String ROUTING_KEY_2 = "key2";//没有连接队列的路由键
@Autowired
private RabbitTemplate rabbitTemplate;
// @Autowired
// private MyCallBack callBack;
//依赖注入 rabbitTemplate 之后再设置它的回调对象
@PostConstruct
public void init(){
rabbitTemplate.set/confirm/iCallback(this);
rabbitTemplate.setMandatory(true);
//设置回退消息交给谁处理
rabbitTemplate.setReturnCallback(this);
}
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable("message") String message){
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message+":"+ROUTING_KEY,new CorrelationData("1"));
log.info("发送消息:{}, 路由键:{}", message, ROUTING_KEY);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_2, message+":"+ROUTING_KEY_2,new CorrelationData("2"));
log.info("发送消息:{}, 路由键:{}", message, ROUTING_KEY_2);
}
@Override
public void /confirm/i(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData == null ? "" : correlationData.getId();
if(ack){
log.info("交换机消息确认成功!id:{}", id);
}else {
log.error("消息未成功投递到交换机,id:{},退回原因:{}",id, cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息{}被服务器回退,退回原因:{},退回编码:{},交换机:{}, 路由键:{}", new String(message.getBody()), replyText, replyCode, exchange, routingKey);
}
}
消费者
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class /confirm/iConsumer {
public static final String QUEUE_NAME = "/confirm/i_queue";
@RabbitListener(queues = QUEUE_NAME)
public void getMessage(Message message){
String msg = new String(message.getBody());
log.info("接收到队列"+QUEUE_NAME+"的消息:{}", msg);
}
}
报警消费者
package com.zhan.rabbitmq.proAndCon;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class WarningConsumer {
private static final String WARNING_QUEUE = "warning_queue";
@RabbitListener(queues = WARNING_QUEUE)
public void receiveMessage(Message message){
String msg = new String(message.getBody());
log.error("发现不可路由消息:{}", msg);
}
}
测试
访问:http://localhost:8080//confirm/i/sendMessage/你好
- 不可路由的消息会直接转发到备份交换机,即使有生产者实现回调借口不可路由消息也不会被回调函数处理,而是直接送入备份(fanout)交换机被广播到所绑定的队列,即mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,备份交换机优先级高。
在数学中幂等的概念或许比较抽象,但是在开发中幂等性是极为重要的。简单来说,对于同一个系统,在同样条件下,一次请求和重复多次请求对资源的影响是一致的,就称该操作为幂等的。比如说如果有一个接口是幂等的,当传入相同条件时,其效果必须是相同的。
特别是对于现在分布式系统下的 RPC 或者 Restful 接口互相调用的情况下,很容易出现由于网络错误等等各种原因导致调用的时候出现异常而需要重试,这时候就必须保证接口的幂等性,否则重试的结果将与第一次调用的结果不同,如果有个接口的调用链 A->B->C->D->E,在 D->E 这一步发生异常重试后返回了错误的结果,A,B,C也会受到影响,这将会是灾难性的。
2. 消息重复消费消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给MQ 返回 ack 时网络中断, 故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但 实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
3. 解决思路MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费 者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消 息时用该 id 先判断该消息是否已消费过。
4. 消费端的幂等性保障在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性, 这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。
主流的幂等性有两种操作:
- 唯一 ID+指纹码机制,利用数据库主键去重
- 利用 redis 的原子性去实现
唯一 ID+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存 在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数 据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
Redis 原子性
利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费
8.2 优先级队列 1. 使用场景在我们系统中有一个订单催付的场景,客户在商城下的订单,商城会及时将订单推送给我们,如 果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,对于客户对于商家来说,肯定是要分大客户和小客户的,大商家理应当然他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,但是redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景。所以订单量大了后采用 RabbitMQ 进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级。
2. 实现 控制台添加 代码中添加,设置队列优先级Map代码中添加,设置消息优先级params = new HashMap(); params.put("x-max-priority", 10); channel.queueDeclare("hello", true, false, false, params);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();注:
要让队列实现优先级需要做的事情有如下事情:
- 队列需要设置为优先级队列
- 消息需要设置消息的优先 级
- 消费者需要等待消息已经发送到队列中才去消费因为,这样才有机会对消息进行排序
public class Producer {
private static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
try (Channel channel = RabbitMqUtils.getChannel();) {
//给消息赋予一个 priority 属性
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
for (int i = 1; i <11; i++) {
String message = "info"+i;
if(i==5){
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
}else{
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
System.out.println("发送消息完成:" + message);
}
}
}
}
消费者
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//设置队列的最大优先级 最大可以设置到 255 官网推荐 1-10 如果设置太高比较吃内存和 CPU
Map params = new HashMap();
params.put("x-max-priority", 10);
channel.queueDeclare(QUEUE_NAME, true, false, false, params);
System.out.println("消费者启动等待消费..............");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("接收到消息:" + receivedMessage);
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag) -> {
System.out.println("消费者无法消费消息时调用,如队列被删除");
});
}
}
8.3 惰性队列
RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消 费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持 更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致 使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的 时间,也会阻塞队列的操作,进而无法接收新的消息。
1. 两种模式- default:默认模式
- lazy:惰性队列模式
可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default”和“lazy”。下面示 例中演示了一个惰性队列的声明细节:
Map2. 开销对比args = new HashMap (); args.put("x-queue-mode", "lazy"); channel.queueDeclare("myqueue", false, false, false, args);
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅 占用 1.5MB
9. RabbitMQ 集群 9.1. clustering 1. 使用集群原因如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况,该怎么办?单台 RabbitMQ 服务器可以满足每秒 1000 条消息的吞吐量,那么如果应用需要 RabbitMQ 服务满足每秒 10 万条消息的吞吐量呢?购买昂贵的服务器来增强单机 RabbitMQ 务的性能显得捉襟见肘,搭建一个 RabbitMQ 集群才是 解决实际问题的关键。
2. 搭建步骤-
修改 3 台机器的主机名称 vim /etc/hostname
-
配置各个节点的 hosts 文件,让各个节点都能互相识别对方
vim /etc/hosts ip1 node1 ip2 node2 ip3 node3
-
以确保各个节点的 cookie 文件使用的是同一个值
-
在 node1 上执行远程操作命令
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
- 启动 RabbitMQ 服务,顺带启动 Erlang 虚拟机和 RbbitMQ 应用服务(在三台节点上分别执行以 下命令)
rabbitmq-server -detached
- 在节点 2 执行
rabbitmqctl stop_app (rabbitmqctl stop 会将Erlang 虚拟机关闭,rabbitmqctl stop_app 只关闭 RabbitMQ 服务) rabbitmqctl reset rabbitmqctl join_cluster rabbit@node1 rabbitmqctl start_app(只启动应用服务)
- 在节点 3 执行
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@node2 rabbitmqctl start_app
- 集群状态
rabbitmqctl cluster_status
- 需要重新设置用户
# 创建账号 rabbitmqctl add_user admin 123 # 设置用户角色 rabbitmqctl set_user_tags admin administrator # 设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
- 解除集群节点(node2 和 node3 机器分别执行)
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app rabbitmqctl cluster_status rabbitmqctl forget_cluster_node rabbit@node2(node1 机器上执行)9.2 镜像队列
如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并 且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但 是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一 个短暂却会产生问题的时间窗。通过 publisherconfirm 机制能够确保客户端知道哪些消息己经存入磁盘,尽 管如此,一般不希望遇到因单点故障导致的服务不可用。
引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中 的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
1. 搭建步骤-
启动三台集群节点
-
随便找一个节点添加 policy
- 在 node1 上创建一个队列发送一条消息,队列存在镜像队列
-
停掉 node1 之后发现 node2 成为镜像队列
-
就算整个集群只剩下一台机器了 依然能消费队列里面的消息 说明队列里面的消息被镜像队列传递到相应机器里面了
联邦队列可以在多个 Broker 节点(或者集群)之间为单个队列提供均衡负载的功能。一个联邦队列可以 连接一个或者多个上游队列(upstream queue),并从这些上游队列中获取消息以满足本地消费者消费消息 的需求。
9.5 ShovelFederation 具备的数据转发功能类似,Shovel 够可靠、持续地从一个 Broker 中的队列(作为源端,即 source)拉取数据并转发至另一个 Broker 中的交换器(作为目的端,即 destination)。作为源端的队列和作为 目的端的交换器可以同时位于同一个 Broker,也可以位于不同的 Broker 上。Shovel 可以翻译为"铲子",是 一种比较形象的比喻,这个"铲子"可以将消息从一方"铲子"另一方。Shovel 行为就像优秀的客户端应用程 序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。



