RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。
二、四大核心概念 1. 生产者
2. 交换机产生数据发送消息的程序是生产者
3. 队列交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
4. 消费者队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
三、RabbitMQ 工作原理
| 名词 | 描述 |
|---|---|
| Broker | 接收和分发消息的应用,RabbitMQ Server 就是 Message Broker |
| Virtual Host | 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ Server 提供的服务时,可以划分出多个 VHost,每个用户在自己的 VHost 创建 Exchange/Queue 等 |
| Connection | Publisher/Consumer 和 Broker之间的 TCP连接 |
| Channel | 如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 Connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 Thread 创建单独的 Channel 进行通讯,AMQP Method 包含了 Channel Id 帮助客户端和 Message Broker 识别 Channel,所以 Channel 之间是完全隔离的。Channel 作为轻量级的 Connection,极大减少了操作系统建立 TCP Connection 的开销 |
| Exchange | Message 到达 Broker 的第一站,根据分发规则,匹配查询表中的 Routing Key,分发消息到 Queue 中去。常用的类型有:直接direct(point-to-point),主题topic(publish-subscribe)、标题headers、扇出fanout(multicast) |
| Queue | 消息最终被送到这里等待 Consumer 取走 |
| Binding | Exchange 和 Queue 之间的虚拟连接,Binding 中可以包含 routing_key,Binding 信息被保存到 Exchange 中的查询表中,用于 Message 的分发依据 |
四、RabbitMQ 核心部分 1. Hello World
1.1 生产者发送单个消息的生产者和接收消息并打印出来的消费者;
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂IP,连接RabbitMQ的队列
factory.setHost("192.168.137.155");
// 用户名
factory.setUsername("guest");
// 密码
factory.setPassword("guest");
try {
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送完毕");
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
1.2 消费者
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.137.155");
factory.setUsername("guest");
factory.setPassword("guest");
try {
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME, true,
// 接收消息
(consumerTag, message) -> {
System.out.println(message);
System.out.printf("接收到消息: %srn", new String(message.getBody()));
},
// 取消消息
consumerTag -> {
System.out.printf("中断消费消息: %srn", consumerTag);
});
System.out.println("等待接收消费...");
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
2. Work Queues
2.1 生产者工作队列(又称任务队列)的主要思想是,避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
public class Producer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitMQUtil.getChannel();
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 从控制台接收消息
Scanner scanner = new Scanner(System.in);
System.out.println("输入消息: ");
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.printf("发送消息: %srn", message);
}
}
}
2.2 消费者
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 消费者消费消息
channel.basicConsume(QUEUE_NAME, true,
// 接收消息
(consumerTag, message) -> {
System.out.printf("接收到消息: %srn", new String(message.getBody()));
},
// 取消消息
consumerTag -> {
System.out.printf("中断消费消息: %srn", consumerTag);
});
System.out.println("Consumer, 2,等待接收消息...");
}
}
2.3 启动两个消费者,轮训分发消息
3. 消息应答
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续再发送给该消费者的消息,它都无法接收到。
3.1 自动应答为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了,RabbitMQ 可以把该消息删除了。
3.2 消息应答的方法消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现 连接 或者 Channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
// 用于肯定确认(告诉 RabbitMQ 该消息成功的被处理,可以将其丢弃了) void basicAck(long deliveryTag, boolean multiple); // 用于否定确认 void basicNack(long deliveryTag, boolean multiple, boolean requeue); // 用于拒绝消息(不处理该消息了直接拒绝,可以将其丢弃了) void basicReject(long deliveryTag, boolean requeue);
3.3 消息自动重新入队multiple 的 true 和 false 代表不同意思
- true 代表批量应答 Channel 上未应答的消息
比如说 Channel 上有传送 tag 的消息 5, 6, 7, 8 当前 tag 是 8,那么此时 5-8 的这些还未应答的消息都会被批量确认收到消息应答;- false 同上面相比
只会应答 tag=8 的消息,5, 6, 7 这三个消息依然不会被确认收到消息应答;
3.4 消息手动应答如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭 或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
3.4.1 生产者默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,修改消费者代码。
public class Producer {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取信道
Channel channel = RabbitMQUtil.getChannel();
// 2. 队列持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 从控制台接收消息
Scanner scanner = new Scanner(System.in);
System.out.println("输入消息: ");
while (scanner.hasNext()) {
String message = scanner.next();
// 3. 消息持久化
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
System.out.printf("发送消息: %srn", message);
}
}
}
3.4.2 消费者
// 1. false手动应答 boolean autoAck = false; channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
public class Consumer1 {
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("Consumer1,睡1秒");
// 获取信道
Channel channel = RabbitMQUtil.getChannel();
// int prefetchCount = 1;
int prefetchCount = 2;
channel.basicQos(prefetchCount);
// 1. false手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck,
// 接收消息
(consumerTag, message) -> {
try {
// 睡1秒
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("接收到消息: %srn", new String(message.getBody(), "UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
},
// 取消消息
consumerTag -> {
System.out.printf("中断消费消息: %srn", consumerTag);
});
}
}
4. RabbitMQ 持久化
4.1 队列持久化(Features = D)刚刚我们已经看到了如何 处理任务不丢失 的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的 消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将 队列 和 消息都标记为持久化。
之前我们创建的队列都是非持久化的,RabbitMQ 如果重启的化,该队列就会被删除掉,如果要队列实现持久化 需要在声明队列的时候把 durable 参数设置为持久化
// 2. 队列持久化 boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
4.2 消息持久化但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
要想让消息实现持久化,需要在消息生产者修改代码,MessageProperties.PERSISTENT_TEXT_PLAIN 添加这个属性。
// 3. 消息持久化
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish("", QUEUE_NAME, props, message.getBytes("UTF-8"));
5. 不公平分发(Prefetch count = 1)将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考 发布确认。
在最开始的时候我们知道 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的话,就会导致处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。
int prefetchCount = 1; // int prefetchCount = 2; channel.basicQos(prefetchCount);
5.1 预取值意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我新任务,我目前只能处理一个任务,然后 RabbitMQ 就会把该任务分配给没有那么忙的其他空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 Worker 或者改变其他存储任务的策略。
6. 发布确认 6.1 发布确认原理本身消息的发送就是异步发送的,所以在任何时候,Channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置 “预取计数” 值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。
例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ACK。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。
消息应答和 Qos 预取值对用户吞吐量有重大影响。通常增加预取值将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。
6.2 发布确认的策略 6.2.1 开启发布确认的方法生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认默认是没有开启的,如果要开启需要调用方法 /confirm/iSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法
// 开启发布确认 channel./confirm/iSelect();6.2.2 单个确认发布
这是一种简单的确认方式,它是一种 同步确认发布 的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。
private static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMQUtil.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
// 开启发布确认
channel./confirm/iSelect();
// 开始时间
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = String.format("单个确认发布: %d", i);
channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
// 单个消息确认
boolean flag = channel.waitFor/confirm/is();
if (flag) {
System.out.printf("%s, 确认rn", message);
}
}
System.out.printf("%d个消息,单个确认发布,耗时: %s毫秒", MESSAGE_COUNT, System.currentTimeMillis() - begin).println();
}
6.2.3 批量确认发布
上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
private static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMQUtil.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
// 开启发布确认
channel./confirm/iSelect();
// 批量确认消息数量
int batchSize = 100;
// 开始时间
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = String.format("批量确认发布: %d", i);
channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
// 判断发送消息达到100条,批量确认一次
if (i % batchSize == 0) {
channel.waitFor/confirm/is();
System.out.printf("%s, 确认rn", message);
}
}
System.out.printf("%d个消息,批量确认发布,耗时: %s毫秒", MESSAGE_COUNT, System.currentTimeMillis() - begin).println();
}
6.2.4 异步确认发布
异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否发送成功:
public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMQUtil.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, true, false, false, null);
// 开启发布确认
channel./confirm/iSelect();
ConcurrentSkipListMap map = new ConcurrentSkipListMap<>();
// 消息确认监听器(异步通知)
channel.add/confirm/iListener(
// 监听那些消息成功
(deliveryTag, multiple) -> {
System.out.printf("成功的消息: %s", deliveryTag).println();
// 2. 删除已确认的消息
if (multiple) {
// 批量清除
ConcurrentNavigableMap concurrentNavigableMap = map.headMap(deliveryTag);
concurrentNavigableMap.clear();
} else {
map.remove(deliveryTag);
}
},
// 监听那些消息失败
(deliveryTag, multiple) -> {
System.out.printf("失败的消息: %s", deliveryTag).println();
System.out.println(map.get(deliveryTag));
});
// 开始时间
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = String.format("异步确认发布: %d", i);
channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
// 1. 记录所有发送的消息
map.put(channel.getNextPublishSeqNo(), message);
}
System.out.printf("%d个消息,异步确认发布,耗时: %s毫秒", MESSAGE_COUNT, System.currentTimeMillis() - begin).println();
}
6.2.4.1 如何处理异步未确认消息
6.2.5 以上 3 种发布确认速度对比最好的解决的解决方案,就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ConcurrentlinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。
| 发布确认 | 优缺点 |
|---|---|
| 单独发布消息 | 同步等待确认,简单,但吞吐量非常有限 |
| 批量发布消息 | 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题 |
| 异步处理 | 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些 |
在之前,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程)。现在我们将消息传达给多个消费者。这种模式称为 “发布/订阅”。
7.1 Exchanges 概念为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者。
RabbitMQ 消息传递模型的核心思想是:生产者生产的消息不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
7.2 Exchanges 类型 7.2.1 直接(direct) 7.2.2 主题(topic) 7.2.3 标题(headers) 7.2.4 扇出(fanout) 7.3 无名 Exchange相反,生产者只能将消息发送到交换机(Exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列,或是放到许多队列中,还是应该丢弃。这就的由交换机的类型来决定。
之前我们都使用的是默认交换,通过空字符串("")进行标识。
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
7.4 临时队列exchange:空字符串表示默认或无名称交换机;
routingKey:消息能路由发送到队列中,其实是由 routingKey(bindingkey) 绑定 key 指定的;
我们连接到 RabbitMQ 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
String queueName = channel.queueDeclare().getQueue();7.5 绑定(bindings)
7.6 扇出(fanout)binding 是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。
比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定:
7.6.1 Fanout 实战Fanout 是将接收到的消息广播到它知道的所有队列中。系统中默认有的 exchange 类型
7.6.2 生产者Logs 和临时队列的绑定关系如下图:
public class EmitLogFanout {
// 交换机名称
private static final String EXCHANGE_NAME = "fanout_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 从控制台接收消息
Scanner scanner = new Scanner(System.in);
System.out.println("输入消息: ");
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.printf("发送消息: %srn", message);
}
}
}
7.6.3 消费者
public class ReceiveLogsFanout1 {
// 交换机名称
private static final String EXCHANGE_NAME = "fanout_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机 与 队列
channel.queueBind(queueName, EXCHANGE_NAME, "");
// 接收消息
channel.basicConsume(queueName, true,
// 接收消息
(consumerTag, message) -> {
System.out.println(message);
System.out.printf("ReceiveLogsFanout1,接收到消息: %srn", new String(message.getBody()));
},
// 取消消息
consumerTag -> {
System.out.printf("ReceiveLogsFanout1,中断消费消息: %srn", consumerTag);
});
System.out.println("ReceiveLogsFanout1,等待接收消费...");
}
}
7.7 直接(direct)
在上面,我们构建了一个简单的日志记录系统。能够向许多接收者广播日志消息。现在我们将向其中添加一些特别的功能,比如说我们只让某个消费者订阅发布部分消息。
例如:我们希望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志,消息避免浪费磁盘空间。
fanout 这种交换机类型并不能给我们很大的灵活性,它只能进行无意识的广播,在这里我们将使用 direct 这种交换机类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。
7.7.1 多重绑定bindings 是绑定交换机和队列之间的桥梁关系。也可以理解:队列只对它绑定的交换机的消息感兴趣。
绑定用 参数routingKey 来表示也可称该参数为 binding key,创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey") 绑定之后的意义由其交换类型决定。
在上面图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,队列 Q2 绑定键有两个,一个绑定键为 black,另一个绑定键为 green。
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black 或 green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
7.7.2 Direct 实战如上图所示,如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 都相同,在这种情况下虽然绑定类型是 direct,但是它表现的就和 fanout 有点类似了,就跟广播差不多。
public class EmitLogDirect {
// 交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 从控制台接收消息
Scanner scanner = new Scanner(System.in);
System.out.println("输入消息: ");
while (scanner.hasNext()) {
String message = scanner.next();
// info、warning、error
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.printf("发送消息: %srn", message);
}
}
}
7.7.4 消费者一
public class ReceiveLogsDirect1 {
// 交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个队列
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
// 绑定交换机 与 队列
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
// 接收消息
channel.basicConsume(queueName, true,
// 接收消息
(consumerTag, message) -> {
System.out.println(message);
System.out.printf("ReceiveLogsDirect1,接收到消息: %srn", new String(message.getBody()));
},
// 取消消息
consumerTag -> {
System.out.printf("ReceiveLogsDirect1,中断消费消息: %srn", consumerTag);
});
System.out.println("ReceiveLogsDirect1,等待接收消费...");
}
}
7.7.5 消费者二
public class ReceiveLogsDirect2 {
// 交换机名称
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 声明一个队列
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
// 绑定交换机 与 队列
channel.queueBind(queueName, EXCHANGE_NAME, "error");
// 接收消息
channel.basicConsume(queueName, true,
// 接收消息
(consumerTag, message) -> {
System.out.println(message);
System.out.printf("ReceiveLogsDirect2,接收到消息: %srn", new String(message.getBody()));
},
// 取消消息
consumerTag -> {
System.out.printf("ReceiveLogsDirect2,中断消费消息: %srn", consumerTag);
});
System.out.println("ReceiveLogsDirect2,等待接收消费...");
}
}
7.8 主题(topic)
在上面,改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。
7.8.1 Topic 的要求尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性,比如说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候就只能使用 topic 类型
发送到 topic 类型的交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”,这种类型的。当然这个单词列表最多不能超过 255 个字节。
7.8.2 Topic 匹配案例*(星号):可以代替一个单词;
#(井号):可以替代零个或多个单词;
当一个队列绑定键是 #,那么这个队列将接收所有数据,就有点像 fanout 了;
如果队列绑定键当中没有 # 和 * 出现,那么该队列绑定类型就是 direct 了;
Q1 绑定的是中间带 orange 带 3 个单词的字符串(*.orange.*)
Q2 绑定的是最后一个单词是 rabbit 的 3 个单词(*.*.rabbit),和第一个单词是 lazy 的多个单词(lazy.#)
| RoutingKey | 结果 |
|---|---|
| quick.orange.rabbit | 被队列 Q1Q2 接收到 |
| lazy.orange.elephant | 被队列 Q1Q2 接收到 |
| quick.orange.fox | 被队列 Q1 接收到 |
| lazy.brown.fox | 被队列 Q2 接收到 |
| lazy.pink.rabbit | 虽然满足两个绑定但只被队列 Q2 接收一次 |
| quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 |
| quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 |
| lazy.orange.male.rabbit | 是四个单词但匹配 Q2 |
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final Map bindingKeyMap = new HashMap<>();
static {
bindingKeyMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox", "被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配 Q2");
}
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 发送消息
for (Map.Entry entry : bindingKeyMap.entrySet()) {
String routingKey = entry.getKey();
String message = entry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.printf("EmitLogTopic, 发送消息: %s", message).println();
}
}
}
7.8.5 消费者一
public class ReceiveLogsTopic1 {
// 交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明队列
String queueName = "Q1";
channel.queueDeclare(queueName, false, false, false, null);
// 队列绑定交换机
String routingKey = "*.orange.*";
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
// 接收消息
channel.basicConsume(queueName, true,
(consumerTag, message) -> {
System.out.printf("ReceiveLogsTopic1,接收到消息: %s, queueName: %s, routingKey: %s",
new String(message.getBody(), "UTF-8"),
queueName,
message.getEnvelope().getRoutingKey()).println();
},
consumerTag -> {
});
System.out.println("ReceiveLogsTopic1, 等待接收消费...");
}
}
7.8.6 消费者二
public class ReceiveLogsTopic2 {
// 交换机名称
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 声明队列
String queueName = "Q2";
channel.queueDeclare(queueName, false, false, false, null);
// 队列绑定交换机
String routingKey1 = "*.*.rabbit";
String routingKey2 = "lazy.*.*";
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1);
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2);
// 接收消息
channel.basicConsume(queueName, true,
(consumerTag, message) -> {
System.out.printf("ReceiveLogsTopic2,接收到消息: %s, queueName: %s, routingKey: %s",
new String(message.getBody(), "UTF-8"),
queueName,
message.getEnvelope().getRoutingKey()).println();
},
consumerTag -> {
});
System.out.println("ReceiveLogsTopic2, 等待接收消费...");
}
}
8. 死信队列
8.1 死信的概念
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,Producer 将消息投递到 Broker 或者直接到 Queue 里了,Consumer 从 Queue 取出消息进行消费,但某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
8.2 死信的来源 8.2.1 消息 TTL 过期应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中,还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效;
// 1. 消息TTL过期,设置过期时间100秒(也可以在生产方设置)
arguments.put("x-message-ttl", 100 * 1000);
// 1. 消息TTL过期,设置TTL过期时间10秒(ttl: time to live)
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().expiration("10000").build();
channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, props, message.getBytes("UTF-8"));
8.2.2 队列达到最大长度
队列满了,无法再添加数据到 mq 中
// 2. 队列达到最大长度,设置正常队列长度限制
arguments.put("x-max-length", 6);
8.2.3 消息被拒绝
basic.reject 或 basic.nack,并且 requeue=false
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
System.out.printf("拒绝到消息: %s, queueName: %s, routingKey: %s",
msg,
NORMAL_QUEUE,
message.getEnvelope().getRoutingKey()).println();
8.3 死信实战
8.4 生产者
public class Producer {
// 交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
// routingKey
private static final String NORMAL_ROUTING_KEY = "zhangsan";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = RabbitMQUtil.getChannel();
// 1. 消息TTL过期,设置TTL过期时间10秒(ttl: time to live)
// AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
// .expiration("10000").build();
for (int i = 0; i < 10; i++) {
String message = String.format("info: %s", i);
// channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, props, message.getBytes("UTF-8"));
// TimeUnit.SECONDS.sleep(2);
channel.basicPublish(NORMAL_EXCHANGE, NORMAL_ROUTING_KEY, null, message.getBytes("UTF-8"));
System.out.printf("发送消息: %s", message).println();
}
}
}
8.5 消费者一
public class Consumer1 {
// 交换机名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String DEAD_EXCHANGE = "dead_exchange";
// 队列名称
private static final String NORMAL_QUEUE = "normal-queue";
private static final String DEAD_QUEUE = "dead-queue";
// routingKey
private static final String NORMAL_ROUTING_KEY = "zhangsan";
private static final String DEAD_ROUTING_KEY = "lisi";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 声明正常交换机、死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 正常队列参数
Map arguments = new HashMap<>();
// 指定死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 指定死信RoutingKey
arguments.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
// 1. 消息TTL过期,设置过期时间100秒(也可以在生产方设置)
// arguments.put("x-message-ttl", 100 * 1000);
// 2. 队列达到最大长度,设置正常队列长度限制
// arguments.put("x-max-length", 6);
// 声明正常队列
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 队列 绑定 交换机
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, DEAD_ROUTING_KEY);
boolean autoAck = false;
// 正常 接收消息
channel.basicConsume(NORMAL_QUEUE, autoAck,
(consumerTag, message) -> {
String msg = new String(message.getBody(), "UTF-8");
if (msg.endsWith("6")) {
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
System.out.printf("拒绝到消息: %s, queueName: %s, routingKey: %s",
msg,
NORMAL_QUEUE,
message.getEnvelope().getRoutingKey()).println();
} else {
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
System.out.printf("接收到消息: %s, queueName: %s, routingKey: %s",
msg,
NORMAL_QUEUE,
message.getEnvelope().getRoutingKey()).println();
}
},
consumerTag -> {
});
System.out.println("Consumer1, 等待接收消息...");
}
}
8.6 消费者二
public class Consumer2 {
// 队列名称
private static final String DEAD_QUEUE = "dead-queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
// 接收消息
channel.basicConsume(DEAD_QUEUE, true,
(consumerTag, message) -> {
System.out.printf("接收到消息: %s, queueName: %s, routingKey: %s",
new String(message.getBody(), "UTF-8"),
DEAD_QUEUE,
message.getEnvelope().getRoutingKey()).println();
},
consumerTag -> {
});
System.out.println("Consumer2, 等待接收消息...");
}
}
9. 延迟队列
9.1 延迟队列概念
9.2 延迟队列使用场景队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后,或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
- 订单在十分钟之内未支付则自动取消;
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒;
- 用户注册成功后,如果三天内没有登陆则进行短信提醒;
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员;
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议;
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务;
如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做;
如:对于如果账单一周内未支付则进行自动结算这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景;
9.3 RabbitMQ 中的 TTL如:订单十分钟内未支付则关闭,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。
9.3.1 消息设置 TTLTTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间;
单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为**“死信”**。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
// 发送消息的时候,延迟时长 message.getMessageProperties().setExpiration(ttlTime);9.3.2 队列设置 TTL
@Bean("queueA")
public Queue queueA() {
Map arguments = new HashMap<>(3);
// 设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
// 设置TTL
arguments.put("x-message-ttl", 10 * 1000);
return QueueBuilder
.durable(QUEUE_A)
.withArguments(arguments)
.build();
}
9.3.3 两者的区别
- 如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列会被丢到死信队列中);
- 第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;
- 如果不设置 TTL,表示消息永远不会过期;
- 如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
RabbitMQ 实现延时队列的两大要素
- 死信队列
- 设置 TTL



