栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ学习笔记

RabbitMQ学习笔记

文章目录
  • RabbitMQ学习笔记
    • 三个最重要的概念:
    • 配置说明
    • 官方教程摘要
      • 1. 工作队列(任务队列 Work Queue)
      • 2. publish/subscribe
      • 3. RPC(Remote Procedure Call)
    • 阿里云消息队列RabbitMQ版

RabbitMQ学习笔记
三个最重要的概念:
  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.
配置说明
  1. 主要配置文件位于/%AppData%/RabbitMQ.config.example中

    参考Configuration — RabbitMQ

    经典配置语法

    %% this is a comment
    [
      {rabbit, [
          {tcp_listeners, [5673]}
        ]
      }
    ].
    

    对应的文件后缀是.config.

    新式配置语法

    # this is a comment
    listeners.tcp.default = 5673
    

    对应的文件后缀是.conf

官方教程摘要 1. 工作队列(任务队列 Work Queue)

工作队列的主要思想是避免立即执行资源密集型任务而必须等待它完成。将任务封装为消息并发送到队列,后台运行的工作者进程将取出任务并执行工作。

工作队列在web应用中尤其有用。

  • Round-robin dispatching : 循环调度

使用工作队列的优点之一是能够轻松地进行并行工作,如果正在进行一个积压任务,则增加更多的worker即可。

  • Message acknowledgment机制

    Message acknowledgment机制可以保证在某个worker突然挂掉以后未处理完的消息不会丢失。消费者会发送一个应答(ack)告诉MQ某个特定的消息被接受到或已被处理,此时MQ可以自由的删除它。一旦消费者挂了,此时就不会给MQ发送ack,MQ会认为消息没有被完全处理,并将重新加入队列。如果同时有其他消费者在线,则MQ将迅速将消息分发给其他消费者。

    如果忘记发送应答信号,则MQ可能会一直redeliver,从而占用掉大量内存。如果发现这一现象,可以用rabbitmqctl来debug

    rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
    
  • 消息耐久性(Message Durability)

    当RabbitMQ退出或崩溃时,它将丢失队列和消息,除非告诉它不这么做。要确保消息不会丢失,需要做两件事:将队列和消息都标记为Durable

    1. 将队列置为durable

      在queue_declare时将durable置为True即可,注意生产者和消费者都需要更改。

      • RabbitMQ不允许对已经声明的队列更改参数。
    2. 将消息置为persistent

      在publish时将delivery_mode置为2即可。

      hannel.basic_publish(exchange='',
                            routing_key="task_queue",
                            body=message,
                            properties=pika.BasicProperties(
                               delivery_mode = 2, # make message persistent
                            ))
      
      • 所谓的persistent不是牢靠的,只是将message写入缓存而不是硬盘。如果想要牢靠的persistent可以用_publisher /confirm/is_。
  • 公平分发(Fair dispatch)

    在多消费者情况下,MQ只是将第n个消息发给第n个消费者,而不会管这个worker是不是处于忙碌状态,因为RabbitMQ在消息进入队列后就立即将其分发出去了。让prefetch_count=1可以让RabbitMQ不会同时给worker多条消息,只在消息处理完并且前一个已经发送应答后才分发新的消息,而且MQ会将消息分发给不busy的worker。

    具体配置方法为:

    channel.basic_qos(prefetch_count=1)
    
2. publish/subscribe

使用publish/subscribe机制的作用是讲同一个消息分发给不同的消费者。

  • exchange⭐

    RabbitMQ中消息传递模型的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者通常根本不知道消息是否会传递到任何队列。相反,生产者只能将消息发到exchange上,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。exchange必须准确地知道如何处理它收到的消息:是否应将其附加到特定队列?是否应将其附加到多个队列?或者应该被丢弃?其规则由exchange类型定义。exchange有direct, topic, headers和fanout四种类型。

    • fanout: 在这种类型中,队列将把接收到的消息广播给所有已知的队列,是‘mindless’的广播。使用此类型时basic_publish()的routing_key 参数会被忽略。

    • direct: 消息将发送到routing_key(生产者消息和exchange的key)和binding_key(队列和exchange的key)刚好匹配的队列。

      在direct类型中,如果key里有*或#,也不会启用模糊匹配。例如routing_key=*orange得消息只会匹配binding_key=*orange的队列。

    • topic: 该类型与direct类型相似。Topic Exchange路由规则没有Direct Exchange那么严格, 支持模糊匹配和多条件匹配,即该类型Exchange使用Routing Key模式匹配和字符串比较的方式将消息路由至绑定的Queue中。

      在队列被‘#’绑定时,exchange表现得像fanout,当队列没有用特殊字符‘#’或‘*’时,exchange表现得像direct。

    • headers: 该类型与direct类型相似。Headers Exchange使用Headers属性代替Routing Key进行路由匹配,在绑定Headers Exchange和Queue时,设置绑定属性的键值对;在向Headers Exchange发送消息时,设置消息的Headers属性键值对,使用消息Headers属性键值对和绑定属性键值对比较的方式将消息路由至绑定的Queue。

在queue_declare()中将queue参数置为空,可以让MQ随机产生一个队列名称。让exclusive=True可以让消费者连接断开后队列自动被删除。

  • Bindings

    binging是队列和exchange之间的关系。将n个队列binding到某个exchange上即可实现一处publish,多处subscribe的效果。

3. RPC(Remote Procedure Call)

使用RabbitMQ搭建RPC很容易,只需要两个队列:一个用于发送client的请求,另一个用于server向client返回reply。

使用RPC不当会写出不好维护的代码,当决定使用RPC时考虑如下三点:

  • 确保能明显看出哪个函数调用是本地的,哪个是远程的。
  • 给系统写文档,明确组件之间的依赖关系。
  • 准备错误备案,当RPC长时间停机时客户端应该如何反应。

有4种可能会经常使用的消息属性:

  • delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
  • content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
  • reply_to: 通常用于命名callback queue。
  • correlation_id: 用于将RPC请求与响应关联起来。因为在client接收到response时他可能不知道对应哪个request,这时correlation_id就派上用场了。
  • 在client端应该对重复的response进行处理。可能出现如下情况:RPC刚给client发送完结果就挂了,并且还没来得及发送request的应答消息(ack),此时RPC会自动重启并再次处理request,从而又发了一次response。

整体工作流如下图所示:

  • 当client开始运行时,它会创建一个匿名且独占的 callback queue.
  • 对于每个RPC请求,client会发送一条带有两种属性的消息:reply_to属性和correlation_id属性。二者分别代表callback_queue和对应于该请求独一无二的值。
  • 请求被发往rpc_queue。
  • The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.
阿里云消息队列RabbitMQ版

阿里云是消息队列RabbitMQ版的服务器,并提供了一些管理用的API,例如exchange、queue、binding的创建和修改等,见API概览 - 消息队列RabbitMQ版 - 阿里云 (aliyun.com)。

有三种API调用方式:HTTP请求、Python SDK(Alibaba Cloud AMQP SDK for Python)和OpenAPI。

使用SDK调用API时首先要确保已经安装了阿里云核心SDK,还需安装云产品相应的SDK,在使用阿里云RabbitMQ时需要如下准备工作:

pip install aliyun-python-sdk-core
pip install aliyun-python-sdk-amqp-open

由于阿里云消息队列RabbitMQ版完全兼容开源的RabbitMQ(依靠公网IP、InstanceID、AccessKey等建立连接),因此具体的生产、消费等操作的编程方式与开源RabbitMQ保持一致。

  • 开源RabbitMQ迁移上云

    阿里云提供了将开源RabbitMQ集群迁移到云端的功能,即队列、exchange、binding等全部移植到云端,教程见开源RabbitMQ迁移上云概述 - 消息队列RabbitMQ版 - 阿里云 (aliyun.com)。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/467311.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号