目录
1. 简述
2. 角色定义
3. 可靠性
4. 运维
5. AMQP0.9.1简介
1. 简述
RabbitMQ,被广发使用的开源的消息中间件。
2. 角色定义
2.1. 在使用rabbitmq过程中,涉及5种角色。
producerconsumerexchangebindingqueue
2.2. producer
producer,发送消息的用户应用。
可执行声明exchange,发布消息等操作。发布消息调用channel.basicPublish(),提供exchangeName,routingKey,消息内容等。
2.3. consumer
consumer,接受消息的用户应用。
可执行声明exchange和queue、定义binding、订阅队列等操作。
2.4. exchange
RabbitMQ消息模型的一个核心思想:生产者不可能将消息直接放入队列。
exchange位于producer和queue之间,一边接收producer发送的消息,一边将消息推给queue。由exchange type控制处理消息的规则。
exchange type主要种类如下:
directtopicfanout
direct类型
为消费者提供了对消息的选择性接收的能力。通过binding key 绝对匹配 消息的routing key,确定消息发送到哪些queue。未匹配到binding key的消息被丢弃。
topic类型
为消费者提供了对消息的更灵活的选择性接收的能力。可以更灵活的建立消息到queue的关系。
发布者发布消息时提供的routing-key由多个用.连接的单词构成,可任意单词,但通常具有含义。
消费者定义绑定时提供的bindingkey通过通配符匹配routingkey,决定消息发到哪些queue。未匹配的消息被丢弃。
通配符说明:*代表一个单词,#代表多个单词。
fanout类型
收到的消息广播给所有queue。一个消息被放到多个队列,从而交付给多个消费者。
2.5. binding
由消费者定义,位于exchange和queue之间,通过binding key(routing_key)标识queue对exchange中哪些消息感兴趣。下图可以清晰的体现binding的位置。
routing_key的具体含义是与exchange-type直接相关的,fanout-type时routing_key无意义。
指令
建立queue和exchange之间的绑定 channel.queueBind(queueName, "logs", "");rabbitmqctl list_bindings
2.6. queue
存储消息的FIFO队列。使用exchange时,由消费者定义queue和binding。
可不使用exchange,直接作为工作队列,每个消息只交付给一个消费者。生产者将消息直接放于队列,broker采用轮询robin方式,将消息分发给消费者。
声明queue时,可配置的属性:
| name | |
| 是否持久化 | |
| 是否排他性 | 仅供某个连接使用,当这个连接断开后该队列自动删除。 |
| 是否自动删除 | 没有消费者订阅这个队列时,队列自动删除 |
| 参数 | 如消息存活时长TTL、队列长度等 |
3. 可靠性
3.1. ack回告和确认
当连接失败时,处于传输过程中的msg需要重新传递,此时需要ack确定何时进行重传。
支持两个方向的回告
consumer向server,告知已经收到或处理消息server告知producer,告知已经收到消息
TCP保证网络层的可靠性,ack和确认则保证应用层的可靠性,表示两层含义:接受者已经收到消息,接下来由接受者负责处理消息。
流程简述
- 消费者收到消息且处理(入库转发等)向broker发送ackbroker收到ackbroker释放消息
ack提供至少一次的交付保证,无ack(消息可能丢失)则提供至多一次的交付。
3.2. 心跳发现中断的TCP连接
AMQP 0-9-1协议要求提供心跳机制,确保应用层能及时发现中断的TCP连接。
3.3. borker
持久化支持:避免broker丢失消息,需要将'持久性exchangequeuemsg'持久到磁盘,以便在处理broker重启硬件问题宕机后恢复。
集群与高可用:RabbitMQ的所有定义(exchangebinginguser等)将在集群所有节点中进行镜像。
对queue,默认仅存在单独节点,可配置为多个节点。
3.4. consumer
consumer应进行幂等处理而不是简单的排重。
broker会对重复消息设置redelivered标签,consumer也可依据该标签。
当订阅的queue被删除后,consumer可收到cancel通知,以进行处理。
consumer可以拒绝收到的消息,使用basic.reject。
3.5. producer
producer从'网络问题'恢复时,需要对'未收到broker ack的消息'进行再传递。
在broker已发ack但producer未收到情况下,进行再次传递后,会造成消息重复,因此需要consumer进行排重或幂等处理。
4. 运维
| docker 镜像 | rabbitmqhttps://registry.hub.docker.com/_/rabbitmq/ |
| management plugin | 提供针对RabbitMQ nodes and clusters的管理和监控的能力。 |
| 启动容器 | docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 8080:15672 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guestp rabbitmq:3-management |
| maven | amqp-client |
5. AMQP0.9.1简介
5.1. what
AMQP 即 Advanced Message Queuing Protocol 高级消息队列协议。AMQP0.9.1是一种消息协议,使得客户端与消息中间件broker可以进行交互。
5.2. message broker
message broker有两个职责,接收由生产者发布的消息,将消息路由给消费者。
5.3. AMQP0.9.1模型概述
模型图示
模型概述
- 消息被发布到exchange依据分发规则binding,exchange将消息的拷贝分分发给queuebroker将queue中的消息交付给订阅该queue的消费者(push),或者消费者从queue进行预先拉取(fetch/pull)
消息确认概念
面对网络问题或消费者处理问题,AMQP0.9.1采取了消息确认概念。当消息交付给消费者后,消费者需告知broker,可以是自动的,也可以是由消费者执行。当broker收到来自消费者的交付确认后,borker将删除对应queue的相应消息。
5.4. exchange和exchangeType
5.4.1. exchange简介
exchange负责接收消息和路由给0或多个队列。路由算法与exchangeType和binding规则有关。
声明时属性
NameDurability 是否持久化Auto-delete 无绑定queue时是否自动删除Arguments 与插件或特性相关的参数
5.4.2. Direct exchange
默认类型,routing_key与binding_key进行精准匹配,将消息拷贝路由到匹配的queue。
场景举例
多worker的分布式task,在消费者之间进行循环robin。
5.4.3. Fanout exchange
忽略routing_key,而是进行广播,即将消息拷贝路由到该exchange连接的所有queue。
场景举例
大规模多人在线(MMO)的游戏,通知所有用户某些信息,如选手积分榜更新、其他全局性事件体育新闻网站,实时告知用户比赛分数变化分布式系统,广播变量状态和配置更新
5.4.4. Topic exchange
消息routing_key与binding_key进行通配符匹配,适用于发布订阅模式,消息的多播路由,消费者考虑选择哪些类型的消息。
场景举例
与地理位置相关的分布式数据,例如销售地点多worker参与的后台任务处理,要求每批worker负责特定的任务集股票价格更新、其他金融数据更新与类别或标签相关的新闻更新,例如特定的体育项目或团队
5.5.5. Headers exchange
忽略消息routing_key,而是基于消息headers中属性匹配binding数据进行路由。
5.5. queue
存储消息,供用户消费。
5.6. binding
exchange依据binding规则,决定将消息路由到哪些queue。
5.7. consumer
消费消息的方式
push API,通过订阅方式进行交付,推荐行为pull API,大多数情况下低效,不推荐使用
5.8. 消息ack
消息ack用于解决broker何时删除queue中消息的问题。
两种ack方式
自动ack模式,broker向消费者发送消息之后
channel.basicConsume#autoAck参数传递true明确ack模式,消费者决定发送ack的时机
channel.basicAck
5.9. 拒绝消息
消费者收到消息后,无论处理结果如何,都可以拒绝消息,并可以选择是否requeue。
5.10. 预先获取
多个消费者共享一个queue,规定broker每次向消费者推送多少条消息。简单的负载均衡、提升吞吐量。
5.11. connection
应用级别的TCP连接,当应用不需要连接时,需要进行关闭。
5.12. channel
建立在同一个TCP连接之上的轻量级连接,在一个connection基础上,可以创建多个channel。connection关闭,则关联的所有channel关闭。
多线程情况下,一个线程对应一个channel,不可以共享channel。



