一、MQ简介
在计算机科学中,消息队列是一种线程通信或同一进程的不同线程之间的通信方式。消息的发送者和消息的接收者不需要同时与消息队列交互。消息就保存在队列中,直到接收者取回它。
1、特点:
MQ是生产者-消费者模型的一个典型的代表,一端不断地往消息队列中写入消息,而令一端则可以读取或者订阅队列中的消息。MQ和JMS类似但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
2、优缺点:
应用耦合、异步处理、流量削峰
解耦:
传统模式的缺点:
系统间耦合性太强,系统中A直接调用系统B和系统C的代码,如果将来系统D也要接入,系统A还需要修改代码,过于麻烦。
中间件模式:
中间件模式的优点:
将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要修改代码
异步:
传统模式:
传统模式的缺点:
一些非必要的业务逻辑以同步方式进行,太耗费时间。
中间件模式:
中间件模式的优点:
将小写写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何的修改。
削峰:
传统模式:
传统模式的缺点:
并发量大的时候,所有的请求都直接堆到数据库,造成数据库连接异常。
中间件模式:
中间件模式的优点:
系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢的拉取消息,在生产中这个短暂的高峰期积压是允许的。
缺点:系统可用性降低,系统复杂度提升。
3、使用场景
消息队列,是分布式系统中重要的组件,其通用的使用场景可以简单的描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是使用消息队列的时候。在项目中,将一些无需及时响应且耗时的操作,进行了异步处理,而这种异步处理消息的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
4、为什么使用RabbitMQ
总结如下:
基于AMQP协议
高并发(是一个容器的概念,服务器可以接收的最大任务数量)
高性能(是一个速度的概念,单位时间内服务器可以处理的任任务数量)
高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)
强大的社区支持
支持插件
支持多语言
二、Rabbit MQ专业术语
1、Producing
producing不仅仅是发送消息。发送消息的程序叫做producer生产者
2、Queue
Queue是一个消息盒子的名称。它存活在RabbitMQ里。虽然消息流经RabbitMQ和你的应用里,但是他们只能在Queue中保存,Queue没有任何边界的限制,存多少消息都可以,本质上是一个无限的缓存,许多生产者都可以向Queue发送消息,许多消费者都可以从Queue中接收消息。
3、Consuming
Consuming的意思和接收类似,等待接收消息的程序叫做消费者。
注意:生产者消费者和代理不一定非要在在同一台机器上。
4、ConnectingFactory、Connection、Channel
ConnectionFactory 、 Connection 、 Channel都是Rabbit MQ对外提供的API中最基本的对象。 Connection是RabbitMQ的socket连接,它封装了socket协议相关部分逻辑 Connecting Factory是Connection的生产工厂 Channel是我们与Rabbit MQ打交道的最重要的一个接口,我们大部分的操作都是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。 5、 Message acknowledgment 在实际应用中,可能会发生消费者收到 Queue 中的消息,但没有处理完成就宕机(或出现其 他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消 费者在消费完消息后发送一个回执给 RabbitMQ , RabbitMQ 收到消息回执( Message acknowledgment )后才将该消息从 Queue 中移除;如果 RabbitMQ 没有收到回执并检测到消费 者的 RabbitMQ 连接断开,则 RabbitMQ 会将该消息发送给其他消费者(如果存在多个消费者) 进行处理。这里不存在 timeout 概念,一个消费者处理消息时间再长也不会导致该消息被发送给 其他消费者,除非它的 RabbitMQ 连接断开。 这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给 RabbitMQ ,这将会导致严重的 bug—— Queue 中堆积的消息会越来越多;消费者重启后会重复 消费这些消息并重复执行业务逻辑 … 6、 Message durability 如果我们希望即使在 RabbitMQ 服务重启的情况下,也不会丢失消息,我们可以将 Queue 与 Message 都设置为可持久化的( durable ),这样可以保证绝大部分情况下我们的 RabbitMQ 消 息不会丢失。但依然解决不了小概率丢失事件的发生(比如 RabbitMQ 服务器已经接收到生产者 的消息,但还没来得及持久化该消息时 RabbitMQ 服务器就断电了),如果我们需要对这种小概 率事件也要管理起来,那么我们要用到事务 7、 Prefetch count 前面我们讲到如果有多个消费者同时订阅同一个 Queue 中的消息, Queue 中的消息会被平摊 给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另 外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置 prefetchCount 来 限制 Queue 每次发送给每个消费者的消息数,比如我们设置 prefetchCount=1 ,则 Queue 每次 给每个消费者发送一条消息;消费者处理完这条消息后 Queue 会再给该消费者发送一条消息。8、Exchange
生产者将消息投递到 Queue 中,实际上这在 RabbitMQ 中这种事情永远都不会发生。实际的情 况是,生产者将消息发送到 Exchange (交换器,下图中的 X ),由 Exchange 将消息路由到一个 或多个 Queue 中(或者丢弃)。9、routing key
生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key ,来指定这个消息的 路由规则,而这个 routing key 需要与 Exchange Type 及 binding key 联合使用才能最终生 效。 在 Exchange Type 与 binding key 固定的情况下(在正常使用时一般这些内容都是固定配置 好的),我们的生产者就可以在发送消息给 Exchange 时,通过指定 routing key 来决定消息流 向哪里。 RabbitMQ 为 routing key 设定的长度限制为 255 bytes 。 10、 Binding RabbitMQ 中通过 Binding 将 Exchange 与 Queue 关联起来,这样 RabbitMQ 就知道如何正确地 将消息路由到指定的 Queue 了。 11、 Binding key 在绑定( Binding ) Exchange 与 Queue 的同时,一般会指定一个 binding key ;消费者将消 息发送给 Exchange 时,一般会指定一个 routing key ;当 binding key 与 routing key 相匹 配时,消息将会被路由到对应的 Queue 中。这个将在 Exchange Types 章节会列举实际的例子加 以说明。 在绑定多个 Queue 到同一个 Exchange 的时候,这些 Binding 允许使用相同的 binding key 。 binding key 并不是在所有情况下都生效,它依赖于 Exchange Type ,比如 fanout 类型的 Exchange 就会无视 binding key ,而是将消息路由到所有绑定到该 Exchange 的 Queue 。 12、 Exchange Types RabbitMQ 常用的 Exchange Type 有 fanout 、 direct 、 topic 、 headers 这四种( AMQP 规范里还提到两种 Exchange Type ,分别为 system 与 自定义 ,这里不予以描述),下面分别进 行介绍。 fanout fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所 有与它绑定的 Queue 中。上图中,生产者( P )发送到 Exchange ( X )的所有消息都会路由到图中的两个 Queue ,并最 终被两个消费者( C1 与 C2 )消费。 direct direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 binding key 与 routing key 完全匹配的 Queue 中。
以上图的配置为例,我们以 routingKey=”error” 发送消息到 Exchange ,则消息会路由到 Queue1 ( amqp.gen-S9b… ,这是由 RabbitMQ 自动生成的 Queue 名称)和 Queue2 ( amqp.gen-Agl… );如果我们以 routingKey=”info” 或 routingKey=”warning” 来 发送消息,则消息只会路由到 Queue2 。如果我们以其他 routingKey 发送消息,则消息不会路由 到这两个 Queue 中。 topic 前面讲到 direct 类型的 Exchange 路由规则是完全匹配 binding key 与 routing key ,但这种 严格的匹配方式在很多情况下不能满足实际业务需求。 topic 类型的 Exchange 在匹配规则上进行 了扩展,它与 direct 类型的 Exchage 相似,也是将消息路由到 binding key 与 routing key 相 匹配的 Queue 中,但这里的匹配规则有些不同,它约定: routing key 为一个句点号 . 分隔的字符串(我们将被句点号 . 分隔开的每一段独立的字符 串称为一个单词),如 “stock.usd.nyse” 、 “nyse.vmw” 、 “quick.orange.rabbit” binding key 与 routing key 一样也是句点号 . 分隔的字符串 binding key 中可以存在两种特殊字符 * 与 # ,用于做模糊匹配,其中 * 用于匹配一个单 词, # 用于匹配多个单词(可以是零个)
以上图中的配置为例, routingKey=”quick.orange.rabbit” 的消息会同时路由到 Q1 与 Q2 , routingKey=”lazy.orange.fox” 的消息会路由到 Q1 , routingKey=” lazy.brown.fox” 的消息会路由到 Q2 , routingKey=”lazy.pink.rabbit” 的消息会路由到 Q2 (只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配); routingKey=”quick.brown.fox” 、 routingKey=”orange” 、 routingKey=” quick.orange.male.rabbit” 的消息将会被丢弃,因为它们没有匹配任何 bindingKey 。 headers headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而 是根据发送的消息内容中的 headers 属性进行匹配。 在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到 Exchange 时, RabbitMQ 会取到 该消息的 headers (也是一个键值对的形式),对比其中的键值对是否完全匹配 Queue 与 Exchange 绑定时指定的键值对;如果完全匹配则消息会路由到该 Queue ,否则不会路由到该 Queue 。 该类型的 Exchange 目前用的不多(不过也应该很有用武之地),因此不做重点介绍。 RPC MQ 本身是基于异步的消息处理,前面的示例中所有的生产者( P )将消息发送到 RabbitMQ 后 不会知道消费者( C )处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完 成后再进行下一步处理。这相当于 RPC ( Remote Procedure Call ,远程过程调用)。在 RabbitMQ 中也支持 RPC 。
RabbitMQ 中实现 RPC 的机制是: 客户端发送请求(消息)时,在消息的属性( MessageProperties ,在 AMQP 协议中定义 了 14 种 properties ,这些属性会随着消息一起发送)中设置两个值 replyTo (一个 Queue 名称,用于告诉服务器处理完成后将通知我的消息发送到这个 Queue 中)和 correlationId (此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根 据这个 id 了解哪条请求被成功执行了或执行失败) 服务器端收到消息并处理 服务器端处理完消息后,将生成一条应答消息到 replyTo 指定的 Queue ,同时带上 correlationId 属性 客户端之前已订阅 replyTo 指定的 Queue ,从中收到服务器的应答消息后,根据其中的 correlationId 属性分析哪条请求被执行了,根据执行结果进行后续业务处理
三、Work queues-工作模式队列
1、工作模式队列 - 消息轮询分发 (Round-robin) 通过 Helloworld 工程我们已经能够构建一个简单的消息队列的基本项目,项目中存在几个角 色 : 生产者、消费者、队列,而对于我们真实的开发中,对于消息的消费者通过是有多个的,比 如在实现用户注册功能时,用户注册成功,会给响对应用户发送邮件,同时给用户发送手机短 信,告诉用户已成功注册网站或者 app 应用,这种功能在大部分项目开发中都比较常见,而对于 helloworld 的应用中虽然能够对消息进行消费,但是有很大问题 : 消息消费者只有一个,当消息 量非常大时,单个消费者处理消息就会变得很慢,同时给节点也带来很大压力,导致消息堆积越 来越多。对于这种情况, RabbitMQ 提供了工作队列模式,通过工作队列提供做个消费者,对 MQ 产生的消息进行消费,提高 MQ 消息的吞吐率,降低消息的处理时间 。处理模型图如下 总结 从结果可以看出消息被平均分配到两个消费方,来对消息进行处理,提高了消息处理效率,创 建多个消费者来对消息进行处理。这里 RabitMQ 采用轮询来对消息进行分发时保证了消息被平 均分配到每个消费方,但是引入新的问题 : 真正的生产环境下,对于消息的处理基本不会像我们 现在看到的这样,每个消费方处理的消息数量是平均分配的,比如因为网络原因,机器 cpu, 内存 等硬件问题,消费方处理消息时同类消息不同机器进行处理时消耗时间也是不一样的,比如 1 号 消费者消费 1 条消息时 1 秒, 2 号消费者消费 1 条消息是 5 秒,对于 1 号消费者比 2 号消费者处理消息 快,那么在分配消息时就应该让 1 号消费者多收到消息进行处理,也即是我们通常所说的 ” 能者多 劳 ”, 同样 Rabbitmq 对于这种消息分配模式提供了支持。 问题:任务量很大,消息虽然得到了及时的消费,单位时间内消息处理速度加快,提高了吞吐 量,可是不同消费者处理消息的时间不同,导致部分消费者的资源被浪费。 解决:采用消息公平分发。 总结:工作队列 - 消息轮询分发 - 消费者收到的消息数量平均分配,单位时间内消息处理速度加 快,提高了吞吐量。 2、 工作模式队列 - 消息公平分发 (fair dispatch) 在案例 01 中对于 消息分发采用的是默认轮询分发,消息应答采用的自动应答模式 ,这是因为 当消息进入队列, RabbitMQ 就会分派消息。它不看消费者为应答的数目,只是盲目的将第 n 条 消息发给第 n 个消费者。 为了解决这个问题, 我们使用 basicQos(prefetchCount = 1) 方法,来限制 RabbitMQ 只 发不超过 1 条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送 。执 行模型图如下 :
总结 从结果可以看出 1 号消费者消费消息数量明显高于 2 号,即消息通过 fair 机制被公平分发到每个消 费者。 问题:生产者产生的消息只可以被一个消费者消费,可不可以被多个消费者消费呢? 解决:采用发布与订阅模式。 总结:工作队列 - 公平轮询分发 - 根据不同消费者机器硬件配置,消息处理速度不同,收到的消息 数量也不同,通常速度快的处理的消息数量比较多,最大化使用计算机资源。适用于生成环境。 四、 Publish/Subscribe- 消息的发布与订阅模式队列
从图中看到 : 消息产生后不是直接投送到队列中,而是将消息先投送给 Exchange 交换机,然后消息经过 Exchange 交换机投递到相关队列 多个消费者消费的不再是同一个队列,而是每个消费者消费属于自己的队列。 总结 从结果可以看出生产者发送了一条消息,用于邮件发送和短信发送的消费者均可以收到消息进行 后续处理。 问题:生产者产生的消息所有消费者都可以消费,可不可以指定某些消费者消费呢? 解决:采用 direct 路由模式。 2、 Routing- 路由模式队列 通过案例 03, 可以看到,生产者将消息投送给交换机后,消息经交换机分发到不同的队列即 : 交 换机收到消息,默认对于绑定到每个交换机的队列均会接收到交换机分发的消息,对于案例 03 的交换机的消息分发 Exchange Types 为 fanout 类型,通常在真正项目开发时会遇到这种情况 : 在对项目信息输出日志进行收集时,会把日志 (error warning,info) 分类进行输出,这时通过 Exchange Types 中的 direct 类型就可以实现,针对不同的消息,在对消息进行消费时,通过 Exchange types 以及 Routing key 设置的规则 ,便可以将不同消息路由到不同的队列中然 后交给不同消费者进行消费操作。模型图如下 :
从图中可以看出 : 1. 生产者产生的消息投给交换机 2. 交换机投送消息时的 Exchange Types 为 direct 类型 3. 消息通过定义的 Routing Key 被路由到指定的队列进行后续消费 总结 从结果可以看出生产者发送了多条设置了路由规则的消息,消费者可以根据具体的路由规则消费 对应队列中的消息,而不是所有消费者都可以消费所有消息了。 问题:生产者产生的消息如果场景需求过多需要设置很多路由规则,可不可以减少? 解决:采用 topic 主题模式。 3、 Topics- 主题模式队列 通过案例 04 看到消息通过交换机 Exchange Type 以及 Routing Key 规则,可以将消息路由到指 定的队列,也符合在工作中的场景去使用的一种方式,对于 RabbitMq 除了 direct 模式外, Mq 同样还提供了 topics 主题模式来对消息进行匹配路由,比如在项目开发中,拿商品模块来说, 对于商品的查询功能在对商品进行查询时我们将查询消息路由到查询对应队列,而对于商品的添 加、更新、删除等操作我们统一路由到另外一个队列来进行处理,此时采用 direct 模式可以实 现,但对于维护的队列可能就不太容易进行维护,如果涉及模块很多,此时对应队列数量就很 多,此时我们就可以通过 topic 主题模式来对消息路由时进行匹配,通过指定的匹配模式将消 息路由到匹配到的队列中进行后续处理。对于 routing key 匹配模式定义规则举例如下 : routing key 为一个句点号 . 分隔的字符串(我们将被句点号 . 分隔开的每一段独立的字符 串称为一个单词),如 “stock.usd.nyse” 、 “nyse.vmw” 、 “quick.orange.rabbit” routing key 中可以存在两种特殊字符 * 与 # ,用于做模糊匹配,其中 * 用于匹配一个单 词, # 用于匹配多个单词(可以是零个) 例如 : 以上图中的配置为例: routingKey=”quick.orange.rabbit” 的消息会同时路由到 Q1 与 Q2 , routingKey=”lazy.orange.fox” 的消息会路由到 Q1 , Q2, routingKey=”lazy.brown.fox” 的消息会路由到 Q2 , routingKey=”lazy.pink.rabbit” 的消息会路由到 Q2 ; routingKey=”quick.brown.fox”; routingKey=”orange”; routingKey=”quick.orange.male.rabbit” 的消息将会被丢弃,因为它们没有匹配任何 bindingKey 。
总结 从结果可以看出生产者发送了多条设置了路由匹配规则 ( 主题 ) 的消息,根据不同的路由匹配规则 ( 主题 ) ,可以将消息根据指定的 routing key 路由到匹配到的队列中,也是在生产中比较常见的一 种消息处理方式。 问题: RabbitMQ 本身是基于异步的消息处理,是否可以同步实现? 解决:采用 RPC 模式。 4、RPC- 远程过程调用模式队列 MQ 本身是基于异步的消息处理,前面的示例中所有的生产者( P )将消息发送到 RabbitMQ 后 不会知道消费者( C )处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完 成后再进行下一步处理。这相当于 RPC ( Remote Procedure Call ,远程过程调用)。在 RabbitMQ 中也支持 RPC 。
RabbitMQ 中实现 RPC 的机制是: 1. 客户端发送请求(消息)时,在消息的属性( MessageProperties ,在 AMQP 协议中定义了 14 种 properties ,这些属性会随着消息一起发送)中设置两个值 replyTo (一个 Queue 名 称,用于告诉服务器处理完成后将通知我的消息发送到这个 Queue 中)和 correlationId (此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根 据这个 id 了解哪条请求被成功执行了或执行失败) 2. 服务器端收到消息并处理 3. 服务器端处理完消息后,将生成一条应答消息到 replyTo 指定的 Queue ,同时携带 correlationId 属性 客户端之前已订阅 replyTo 指定的 Queue ,从中收到服务器的应答消息后,根据其中的 correlationId 属性分析哪条请求被执行了,根据执行结果进行后续业务处理。



