工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反,我们安排任务稍后完成。我们将一个任务封装成一条消息并发送到队列中。在后台运行的工作进程将弹出任务并最终执行作业。当您运行许多工人时,任务将在他们之间共享。这个概念在 web 应用程序中特别有用,因为在短的HTTP请求窗口期间不可能处理复杂的任务。
在教程一中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有现实世界的任务,比如要调整大小的图像或要渲染的 pdf 文件,所以让我们假装我们很忙来延迟它——通过使用time.sleep()函数。我们将把字符串中的点数作为它的复杂度;每个点将占一秒钟的“工作”。例如,Hello...描述的假任务需要三秒钟。
我们将稍微修改之前示例中的send.py代码,以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中,因此我们将其命名为 new_task.py:
import pika,sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(" [x] Sent 'Hello World!'")
connection.close()
我们旧的receive.py脚本也需要一些更改:它需要为消息正文中的每个点伪造一秒钟的工作。它将从队列中弹出消息并执行任务,所以我们称之为worker.py:
import pika, sys, os
import time
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
1.1 循环调度
使用任务队列的优势之一是能够轻松并行化工作。如果我们正在建立积压的工作,我们可以添加更多的工作人员,这样就可以轻松扩展。
首先,让我们尝试同时运行两个worker.py脚本。他们都会从队列中获取消息,但究竟是如何呢?让我们来看看。您需要打开三个控制台。两个将运行worker.py 脚本。这些控制台将成为我们的两个消费者——C1 和 C2。
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C
在第三个中,我们将发布新任务。启动消费者后,您可以发布一些消息:
# shell 3 python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....
让我们看看交付给消费者的是什么:
# shell 1 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
# shell 2 python worker.py # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'
默认情况下,RabbitMQ将按顺序把每条消息发送给下一个消费者。正常来说,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询.
1.2 消息确认完成一项任务可能需要几秒钟甚至很长时间,您可能想知道如果其中一个消费者开始了一项长期任务并且只完成了部分任务而挂掉了,会发生什么。使用我们当前的代码,一旦RabbitMQ将消息传递给消费者,它就会立即将其标记为删除。在这种情况下,如果你杀死一个工人,我们将丢失它刚刚处理的消息。我们还将丢失所有已分派给该特定工作人员但尚未处理的消息。
但我们不想丢失任何任务。如果一个工人死了,我们希望将任务交给另一个工人。为了确保消息永远不会丢失,RabbitMQ支持消息确认 。消费者发回确认信息以告诉RabbitMQ已接收、处理特定消息,并且RabbitMQ可以自由删除它。如果消费者在没有发送 ack 的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将理解消息未完全处理并将重新排队。如果有其他消费者同时在线,它会迅速将其重新交付给另一个消费者。这样您就可以确保不会丢失任何消息,即使工作人员偶尔会死亡。
超时(默认为 30 分钟)对消费者交付确认强制执行。这有助于检测从不确认交付的有问题(卡住)的消费者。
默认情况下启用手动确认消息。在前面的例子中,我们通过auto_ack=True 标志明确地关闭了它们。一旦我们完成了一项任务,是时候移除这个标志并向工作人员发送适当的确认了。
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.') )
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
使用此代码,我们可以确保即使您在处理消息时使用 CTRL+C 杀死工作人员,也不会丢失任何内容。工人死后不久,所有未确认的消息都将重新传递。必须在接收交付的同一通道上发送确认。尝试使用不同的通道进行确认将导致通道级协议异常。
1.3 消息持久性
我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止,我们的任务仍然会丢失。当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要。需要做两件事来确保消息不会丢失:我们需要将队列和消息都标记为持久的。
- 首先,我们需要确保队列能够在RabbitMQ节点重启后继续存在。为此,我们需要将其声明为持久的:
channel.queue_declare(queue='hello', durable=True)
虽然这个命令本身是正确的,但它在我们的设置中不起作用。那是因为我们已经定义了一个名为hello的队列 ,它不是持久的。RabbitMQ不允许您使用不同的参数重新定义现有队列,并且会向任何尝试这样做的程序返回错误。但是有一个快速的解决方法,那就是声明一个具有不同名称的队列,例如task_queue:
channel.queue_declare(queue='task_queue', durable=True)
此queue_declare方法更改需要同时应用于生产者和消费者代码。这样修改过后的话,即使 RabbitMQ重启,task_queue队列也不会丢失。
- 现在我们需要将消息标记为持久的——通过提供一个值为2的delivery_mode属性
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
注意:将消息标记为持久性并不能完全保证消息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,但是当RabbitMQ已经接受一条消息并且还没有保存它时,仍然有一个很短的时间窗口。此外,RabbitMQ不会对每条消息都执行fsync(2) —— 它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不强,但对于我们简单的任务队列来说已经足够了。
1.4 公平调度您可能已经注意到调度仍然不能完全按照我们想要的方式工作。例如,在有两个worker的情况下,当所有奇数消息都很重,偶数消息很轻时,一个worker会一直很忙,而另一个worker几乎不做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。
发生这种情况是因为RabbitMQ只是在消息进入队列时分派消息。它不考虑消费者未确认消息的数量。它只是盲目地将第 n 条消息分派给第 n 条消费者。
为了解决这个问题,我们可以使用带有prefetch_count=1设置的Channel#basic_qos通道方法 。这使用basic.qos协议方法告诉 RabbitMQ 一次不要给一个工人分配一条以上的消息。或者,换句话说,在处理并确认前一条消息之前,不要向工作人员发送新消息。相反,它会将它分派给下一个不忙的工人
channel.basic_qos(prefetch_count= 1 )
那么new_task.py文件的整体代码为:
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
work.py文件的整体代码为:
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()



