第二章 `RabbitMQ`核心概念
2.1-大厂使用`RabbitMQ`2.2-`RabbitMQ`高性能原因2.3-`AMQP`协议2.4-`AMQP`核心概念2.5-RabbitMQ整体架构与消息流转2.6-`RabbitMQ`环境安装2.7-命令行与管控台结合讲解2.8-`SpringBoot`和`RabbitMQ`简单应用2.9-交换机讲解2.10-绑定、队列、消息、虚拟主机详解
第二章 RabbitMQ核心概念 2.1-大厂使用RabbitMQ- RabbitMQ介绍
是一个开源的消息代理和队列服务器,用来在不同应用直接共享数据,实现跨语言问题试用Erlang语言编写基于AMQP协议 滴滴,美团,头条,去哪儿
开源,性能优秀,稳定性保障提供可靠性消息投递模式(/confirm/i模式),返回模式(return)与SpringAMQP完美整合,API丰富集群模式丰富,表达式配置,HA模式(高可用模式),镜像队列模型保证数据不丢失的前提下做到高可靠性、可用性
- Erlang语言用于交换机领域,Erlang优点:Erlang有着和原生Socket一样的延迟
AMQP - advanced message queuing protocol 高级消息队列协议 pk JMS java message service
AMQP 是具有现代特征的二进制协议,是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计
AMQP协议模式(->server->virtual host->exchange)3次主要的关联
- Server:又称Broker,接受客户端的连接,实现AMQP实体服务Connection:连接Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可建立多个Channel,每个Channel代表一个会话任务。类似登录的session。Message:消息,服务器与应用程序之间传递的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟投递等高级特性,Body则是消息体内容。Virtual Host:虚拟地址,用于进行逻辑隔离(例如redis 中有16个db,db0-15,比如内存16G,不是说每个db分配1G,db0也是可以存储16个G的,这就是逻辑隔离,不是物理隔离),最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同一个Virtual Host 里面不能有重名的Exchange和QueueExchange:交换机,接收消息,根据路由键转发消息到指定绑定的队列上Binding:Exchange与Queue之间的虚拟连接,binding可以包含routing keyRouting key : 一个路由规则,虚拟机可以用它来确定如何路由一个特定消息Queue:消息队列,保存消息并将他们转发到消费者
整体架构(生产者关注交换机,消费者监听队列,双方解耦)
- 安装Erlang安装RabbitMQ(推荐直接用Docker安装)另起文章说明
# 第一步:查看仓库里的RabbitMQ docker search rabbitmq # 第二步:安装 docker pull rabbitmq:3.8 # 第三步:启动 docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.8 # 第四步:安装插件 docker ps docker exec -it 镜像ID /bin/bash rabbitmq-plugins enable rabbitmq_management # 第五步:访问地址 http://ip地址:15672,这里的用户名和密码默认都是guest2.7-命令行与管控台结合讲解
命令行工具
# 想看什么就List
rabbitmqctl list_exchanges
rabbitmqctl list_queues
rabbitmqctl list_users
# 想加什么就add
rabbitmqctl add_user username password
# 想清空什么就purge
rabbitmqctl purge_queue
# 想删除什么就delete
rabbitmqctl delete_queue
# 想知道就 --help
rabbitmqctl reset 移除所有数据,必须先停止RabbitMQ
rabbitmqctl join_cluster [--ram] 组成集群命令
rabbitmqctl cluster_status 查看集群状态
rabbitmqctl change_cluster_node_type disc|ram 修改集群节点的存储方式,磁盘或内存
rabbitmqctl forget_cluster_node [--offline] 忘记节点(摘除节点)
管控台
overview总览connections 连接channels 信道exchanges 交换机
type 一栏,4种方式,direct直连模式 fanout广播模式 topic 路由模式 headers 数据头模式features(特性)一栏,D(durable)持久化,即使停止了,在开启也会有数据 I(internal)内部的 queues 队列admin 用户管理
- ConnectionFactory 获取连接工厂Connection 新建连接Channel 获取数据信道,可发送和接收数据Producer 和Consumer 生产者和消费者
生产者
public class Producer {
private static final String EXCHANGE_NAME = "exchange_01";
private static final String ROUTING_KEY = "routingkey_01";
private static final String QUEUE_NAME = "queue_01";
private static final String IP_ADDRESS = "127.0.0.1";
private static final String USER = "admin";
private static final String PASSWORD = "admin";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
//1-创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USER);
connectionFactory.setPassword(PASSWORD);
//2-创建连接Connection
Connection connection = connectionFactory.newConnection();
//3-创建信道Channel
Channel channel = connection.createChannel();
//4-创建交换机Exchange-交换机名称,type,是否持久化,是否自动删除,没有其他参数
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
//5-创建队列Queue-队列名称,是否持久化,是否排他(保证顺序消费),是否自动删除,没有其他参数
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//6-队列与交换机进行绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
String msg = "hello,rabbitmq";
//7-发送数据
//basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
// 如果没有指定交换机,则会默认使用AMQP default这个交换机,同时也会默认认为队列名称 == routing key
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_BASIC,msg.getBytes());
//8-关闭资源
channel.close();
connection.close();
}
}
消费者
public class Consumer {
private static final String QUEUE_NAME = "queue_01";
private static final String IP_ADDRESS = "127.0.0.1";
private static final String USER = "admin";
private static final String PASSWORD = "admin";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
//1-创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(IP_ADDRESS);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USER);
connectionFactory.setPassword(PASSWORD);
//2-创建连接Connection
Connection connection = connectionFactory.newConnection();
//3-创建信道Channel
Channel channel = connection.createChannel();
//4-创建消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("msg-"+new String(body));
}
};
//5-消费数据-队列名称,设置是否自动签收,消费者
channel.basicConsume(QUEUE_NAME,true,consumer);
//6-关闭资源
channel.close();
connection.close();
}
}
2.9-交换机讲解
Exchange:接收消息,并根据路由键转发消息 到所绑定的队列
交换机属性
Name 交换机名称Type 交换机类型 direct-topic-fanout-headersDurability 是否持久化Auto delete 当最后一个绑定到Exchange上的队列删除后,自动删除该ExchangeInternal 当前Exchange是否用于RabbitMQ内部使用,默认false,基本不会修改这个属性值Arguments 扩展参数
交换机类型
Direct 直连交换机
所有发送到Direct Exchange的消息,都会被转发到RoutingKey中指定的QueueDriect 模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何绑定操作,消息传递时,RoutingKey必须完整匹配才会被队列接收,否则该消息会被抛弃。
Topic 主题交换机,路由规则交换机
所有发送到Topic Exchange 的消息被转发到所有关心RoutingKey中指定Topic的Queue上
Exchange 将RoutingKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic
# 匹配一个或多个词 比如: log.# 能够匹配log.info.a 或 log.message* 只匹配一个词 比如:log.* 能够匹配 log.info
Fanout 扇形交换机,播发交换机
不处理路由键(RoutingKey),只需要简单的将队列绑定到交换机上发送到交换机的消息,都会被转发到与该交换机绑定的所有队列上转发消息是最快的 2.10-绑定、队列、消息、虚拟主机详解
Binding 绑定
Exchange和Exchange、Queue之间的连接关系Binding中可以包含RoutingKey或其他参数
Queue 队列
实际存储消息数据Durablity是否持久化,Durable(持久化):是,Transient:否(瞬时状态)Auto delete 如果是yes,代表当最后一个监听被移除后,该Queue会自动删除
Message 消息
本质就是一段数据,由Properties和Payload(Body体)组成
常用属性:delivery mode(配送模式)、headers(自定义属性)
其他属性:content_type、content_encoding、priority(优先级)
correlation_id(消息唯一id,相关ack、幂等时使用)、reply_to(做重回队列时使用,比如消息消费失败了回到指定队列)、expiration(过期时间)、message_id(消息id)
//设置message参数
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2) //配送模式2,表示持久化,即使rabbitmq重启数据也不会丢失
.contentEncoding("UTF-8")
.expiration("10000") //设置过期时间
.build();
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,properties,msg.getBytes());
Virtual host虚拟主机
虚拟地址,用于进行逻辑隔离,最上层的消息路由,类似Redis16个db,这里要深入理解下一个虚拟主机中,可以有若干个Exchange和Queue同一个虚拟主机中,不可以有重名的Exchange和Queue



