1.基本概念RabbitMQ 是实现了高级消息队列协议的开源消息代理==软件==(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ是一套开源的消息队列服务软件,由以高性能、健壮以及可伸缩性出名的Erlang写成。
RabbitMQ服务支持下列操作系统:
- Linux
- Windows NT 到 10
- Windows Server 2003 到 2016
- macOS
- Solaris
- FreeBSD
- TRU64
- VxWorks
RabbitMQ支持下列编程语言:
- Python
- Java
- Ruby
- PHP
- C#
- Javascript
- Go
- Elixir
- Objective-C
- Swift
- 可伸缩性:集群服务
- 消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存
- erl
- RabbitMQServer
消息:指在应用之间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
消息队列:是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。
- 消息发布者只管把消息发布到 MQ 中而不用管谁来取。
- 消息使用者只管从 MQ 中取消息而不管是谁发布的。
- 这样发布者和使用者都不用知道对方的存在。
消息队列是一种应用间的异步协作机制,那什么时候需要使用 MQ 呢?
以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ 。
2.RabbitMQ 特点RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
具体特点:
-
可靠性
-
灵活的路由
-
消息集群
-
高可用
-
多种协议
-
多语言客户端
-
管理界面
-
跟踪机制
-
插件机制
所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
消息流
2.RabbitMQ 基本概念 RabbitMQ 内部结构
1、Message
消息,由消息头和消息体组成。
- 消息体是不透明的。
- 消息头由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
4、Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
5、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
6、Connection
网络连接,比如一个TCP连接。
7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
9、Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的vhost 是 / 。
10、Broker
表示消息队列服务器实体。
3.AMQP 中的消息路由生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
AMQP 的消息路由过程
Exchange 类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。*headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,*所以直接看另外三种类型:
1、direct
direct 交换器
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
2、fanout
fanout 交换器
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
3、topic
topic 交换器
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。==#匹配0个或多个单词,*==匹配不多不少一个单词。
“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”只会匹配到“audit.irs”。
4.RabbitMQ 运行和管理1、首先确定Erlang的安装和rabbitmq的安装没问题,环境变量也配置正确。
2、开启插件(我就是错在这个地方)
rabbitmq_management 是管理后台的插件、我们要开启这个插件才能通过浏览器访问登录页面
进入到sbin目录下:rabbitmq-plugins enable rabbitmq_management
3、开启服务:rabbitmq-server start
4、开启浏览器访问 http://localhost:15672
默认userName : guest password : guest
5.Java 客户端访问RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。
1、maven 工程的 pom 文件中添加依赖
com.rabbitmq amqp-client 5.13.1
2、消息生产者
package com.zhao.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//设置 RabbitMQ 地址
factory.setHost("localhost");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola";
//发布消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
}
}
3、消息消费者
package com.zhao.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
final Channel channel = conn.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//声明队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "hola";
//绑定队列,通过键 hola 将队列和交换器绑定起来
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
4、启动 RabbitMQ 服务器
5、运行 Consumer
先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。
6、运行 Producer
接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:
Consumer 控制台
在Erlang 中有两个概念:节点和应用程序。
节点就是 Erlang 虚拟机的每个实例,而多个 Erlang 应用程序可以运行在同一个节点之上。
节点之间可以进行本地通信(不管他们是不是运行在同一台服务器之上)。比如一个运行在节点A上的应用程序可以调用节点B上应用程序的方法,就好像调用本地函数一样。
如果应用程序由于某些原因奔溃,Erlang 节点会自动尝试重启应用程序。
RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。
RabbitMQ 内部利用 Erlang 提供的分布式通信框架 OTP 来满足上述需求,使客户端在失去一个 RabbitMQ 节点连接的情况下,还是能够重新连接到集群中的任何其他节点继续生产、消费消息。
1.RabbitMQ 集群中的一些概念RabbitMQ 会始终记录以下四种类型的内部元数据:
1、队列元数据
包括队列名称和它们的属性,比如是否可持久化,是否自动删除
2、交换器元数据
交换器名称、类型、属性
3、绑定元数据
内部是一张表格记录如何将消息路由到队列
4、vhost 元数据
为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性
在单一节点中,RabbitMQ 会将所有这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上可以确保队列和交换器在节点重启后能够重建。
而在集群模式下同样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。
如果在集群中创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的所有信息,因此当集群节点崩溃时,该节点的队列和绑定就消失了,并且任何匹配该队列的绑定的消息也丢失了。还好 RabbitMQ 2.6.0 之后提供了镜像队列以避免集群节点故障导致的队列内容不可用。
RabbitMQ 集群中可以共享 user、vhost、exchange 等,所有的数据和状态都是必须在所有节点上复制的,RabbitMQ 节点可以动态的加入到集群中。
当在集群中声明队列、交换器、绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。
集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,但是它的执行比磁盘节点要好。内存节点可以提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用。
RabbitMQ 只要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但直到该节点恢复,否则无法更改任何东西。
三.RabbitMQ 实战教程 1.什么是MQ- 消息队列(简称MQ),从字面意思来看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
- 主要用途:不同进程 Process / 线程 Thread 之间通信
- 不同进程之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个。
- 不同进程之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须 对收到的消息进行排队,因此诞生了事实上的消息队列。
- MQ 框架非常之多,比较流行的有 RabbitMQ、ActiveMQ、ZeroMQ、kafka,以及阿里开源的 RocketMQ 。
1、
2、
3、进入C:RabbitMQServerrabbitmq_server-3.9.7sbin输入命令:
rabbitmq-plugins enable rabbitmq_management
4、命令:
停止:net stop RabbitMQ
启动:net start RabbitMQ
5、在浏览器中输入地址查看:http://127.0.0.1:15672/
6、使用默认账号登录:guest/ guest
2.添加用户 1.添加 admin 用户 2.用户角色1、超级管理员(administrator)
可登录管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
2、监控者(monitoring)
可登录管理控制台,同时可以查看 rabbitmq 节点的相关信息(进程数,内存使用情况,磁盘使用情况等)。
3、策略制定者(policymaker)
可登录管理控制台,同时可以对 policy 进行管理。但无法查看节点的相关信息。
4、普通管理者(management)
仅可登录管理控制台,无法看到节点信息,也无法对策略进行管理。
5、其他
无法登录管理控制台,通常就是普通的生产者和消费者。
3.创建Virtual Hosts1、新建 Virtual Hosts
2、选中 Admin 用户,设置权限
3、看到权限已加
4.管理界面中的功能 5.五种队列 1.简单队列P:消息的生产者
C:消息的消费者
生产者将消息发送到队列,消费者从队列中获取消息。
1、导入RabbitMQ的客户端依赖
com.rabbitmq amqp-client 5.13.1
2、获取MQ的连接
package com.zhao.rabbitmq.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("localhost");
//设置端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setUsername("admin");
factory.setPassword("guest");
factory.setVirtualHost("testhost");
//通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
3、生产者发送消息到队列
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhao.rabbitmq.util.ConnectionUtil;
public class Send {
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] args) throws Exception{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息内容
String message = "Hello World!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
//关闭信道和连接
channel.close();
connection.close();
}
}
4、消费者从队列中获取消息
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv {
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消费消息
//boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(QUEUE_NAME, true, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
//long deliveryTag = envelope.getDeliveryTag();
//确认消息
///channel.basicAck(deliveryTag, false);
}
});
}
}
2.Work模式
一个生产者、2个消费者。
一个消息只能被一个消费者获取。
1、消费者1
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
//消费消息
//boolean autoAck = false;
String consumerTag = "";
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, true, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
//long deliveryTag = envelope.getDeliveryTag();
//确认消息
//channel.basicAck(deliveryTag, false);
}
});
}
}
2、消费者2
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
//消费消息
//boolean autoAck = false;
String consumerTag = "";
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, true, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//long deliveryTag = envelope.getDeliveryTag();
//确认消息
//channel.basicAck(deliveryTag, false);
}
});
}
}
3、生产者
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhao.rabbitmq.util.ConnectionUtil;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception{
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息内容
for (int i = 0; i < 100; i++) {
String message = "Hello World!" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
//关闭信道和连接
channel.close();
connection.close();
}
}
4、测试
测试结果:
(1)消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
(2)消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。
- 其实,这样是不合理的,因为消费者1线程停顿的时间短,应该是消费者1要比消费者2获取到的消息多才对。RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询分发消息。
- 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Ack 就可以做到。basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ACK 也是异步发送给队列,从队列的视角来看,总是会有一批消息已推送但尚未获得 ACK 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息的 ACK 确认后,才会向消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发送给消费者。
- 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
- 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ 就会分派消息。它不看消费者未应答的数目,只是盲目的将消息发给轮询指定的消费者。为了解决这个问题,我们使用basicQos(1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。
5、消息确认机制
消息一旦被消费者接收,队列中的消息就会被删除。
那么问题来了:RabbitMQ 怎么知道消息被接收了呢?
如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ 无从得知,这样消息就丢失了!
因此,RabbitMQ 有一个 ACK 机制。当消费者获取消息后,会向 RabbitMQ 发送回执 ACK ,告知消息已经被接收。不过这种回执 ACK 分两种情况:
- 自动 ACK:消息一旦被接收,消费者自动发送 ACK。
- 手动 ACK:消息被接收后,不会发送 ACK ,需要手动调用。
那么该如何选择呢??
这需要看消息的重要性:
- 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
- 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。
但是需要注意的是,手动ACK存在一定的问题。如果消息接收方已经接收但是在业务处理过程中出现异常,那么该消息并未处理完成。但此时消息队列中消息已经被自动删除,因而就会造成消息丢失,这是个非常严重的问题。
3.订阅模式- 1个生产者,多个消费者
- 每一个消费者都有自己的一个队列
- 生产者没有将消息直接发送到队列,而是发送到了交换机
- 每个队列都要绑定到交换机
- 生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
- 注意:一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
1、消息的生产者,向交换机中发送消息
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhao.rabbitmq.util.ConnectionUtil;
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//消息内容
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME, "",null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭信道和连接
channel.close();
connection.close();
}
}
注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
2、消费者1
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_fanout_1";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//消费消息
boolean autoAck = false;
String consumerTag = "";
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
}
});
}
}
3、消费者2
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_fanout_2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//消费消息
boolean autoAck = false;
String consumerTag = "";
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
}
});
}
}
4、测试结果
同一个消息被多个消费者获取
4.路由模式1、生产者
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhao.rabbitmq.util.ConnectionUtil;
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//消息内容
String message = "Hello World!-->direct";
channel.basicPublish(EXCHANGE_NAME, "dir",null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭信道和连接
channel.close();
connection.close();
}
}
2、消费者1
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dir");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//消费消息
boolean autoAck = false;
String consumerTag = "";
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
}
});
}
}
3、消费者2
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "dir");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//消费消息
boolean autoAck = false;
String consumerTag = "";
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
}
});
}
}
5.主题模式(通配符模式)
Topic Exchange ——将路由键和某模式进行匹配。
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费消息。
1、生产者
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.zhao.rabbitmq.util.ConnectionUtil;
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//消息内容
String message = "Hello World!-->topic";
channel.basicPublish(EXCHANGE_NAME, "top.1",null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭信道和连接
channel.close();
connection.close();
}
}
2、消费者1
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_topic_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "top.*");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//消费消息
boolean autoAck = false;
String consumerTag = "";
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
}
});
}
}
3、消费者2
package com.zhao.rabbitmq.simple;
import com.rabbitmq.client.*;
import com.zhao.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_topic_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws Exception {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//在连接中创建信道
Channel channel = connection.createChannel();
//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列到交换器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
//同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//消费消息
boolean autoAck = false;
String consumerTag = "";
//监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
System.out.println("消费的消息体内容:" + new String(body, "UTF-8"));
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
}
});
}
}
https://www.jianshu.com/p/79ca08116d57
https://blog.csdn.net/lzx1991610/article/details/102970854



