在阅读@Martijn
Pieters的文章后,我最初删除了此答案,因为他更详细地描述了“为什么这不起作用”以及更早的版本。然后我意识到,OP的示例中的用例并不完全符合
“如何使用multiprocessing.Queue.get方法”。
这不是因为没有用于演示的子进程,而是因为在实际的应用程序中,几乎没有预先填充队列,而是仅在队列之后才读出,而是在等待时间之间交错读写。Martijn演示的扩展演示代码在通常情况下不起作用,因为当入队不符合阅读要求时,while循环可能会中断得太早。因此,这里是重新加载的答案,它能够处理通常的交错提要和阅读方案:
不要依赖queue.empty检查同步。
将对象放在空队列上之后,在队列的empty()方法返回False且get_nowait()可以在不引发queue.Empty的情况下返回之前,可能会有无穷的延迟。…
空()
如果队列为空,则返回True,否则返回False。由于多线程/多处理语义,这是不可靠的。docs
从队列中使用
for msg in iter(queue.get, sentinel):to
.get(),您都可以通过传递哨兵值来中断循环…
iter(callable,sentinel)?
from multiprocessing import QueueSENTINEL = Noneif __name__ == '__main__': queue = Queue() for i in [*range(3), SENTINEL]: queue.put(i) for msg in iter(queue.get, SENTINEL): print(msg)
…或在需要非阻塞解决方案时使用
get_nowait()并处理可能的
queue.Empty异常。
from multiprocessing import Queuefrom queue import Emptyimport timeSENTINEL = Noneif __name__ == '__main__': queue = Queue() for i in [*range(3), SENTINEL]: queue.put(i) while True: try: msg = queue.get_nowait() if msg == SENTINEL: break print(msg) except Empty: # do other stuff time.sleep(0.1)
如果只有一个进程并且该进程中只有一个线程正在读取队列,则还可以与以下交换最后一个代码片段:
while True: if not queue.empty(): # this is not an atomic operation ... msg = queue.get() # ... thread could be interrupted in between if msg == SENTINEL: break print(msg) else: # do other stuff time.sleep(0.1)
由于线程可能会在检查和之间删除GIL,因此这不适用于进程中的多线程队列读取。如果从队列中读取多个进程,则同样适用。
ifnot queue.empty()``queue.get()
对于单一生产者/单一消费者的方案,使用a
multiprocessing.Pipe代替
multiprocessing.Queue将是足够的,并且性能更高。



