concurrent.futures有一个简约的API。对于非常简单的问题,它很容易使用,但是您没有非常简单的问题。如果您这样做,那么您已经解决了;-)
您没有显示
multiprocessing.Pool您编写的任何代码,但这将是一个更有希望的起点-
假设您要解决问题多于希望确认如果仅您切换就必须容易做到的希望到较弱的API ;-)
继续使用的“明显”方法
multiprocessing是使用
Pool.apply_async()方法,将异步结果对象放在bounded上
Queue.Queue,并使主程序中的线程将其从中拉出,
Queue然后等待结果显示出来。这很容易,但这不是魔术。它解决了您的问题,因为有界
Queues是
在 以不同速度运行的生产者和消费者之间进行调解 的 规范方法。没有任何内容 可以 直接
concurrent.futures解决
该 问题,而这是“大量内存”问题的核心。
# Define global result_queue only in the main program.import Queueresult_queue = Queue.Queue(100) # pick a reasonable max size based on your problem# Run this in as many threads as you like.def consume_results(): while True: a = result_queue.get() if a is None: break output(a.get()) # `output()` is your function...# main program passes out work, after starting threadsfor i in range(1000): # the .put() will block so long as the queue is at its max size result_queue.put(pool.apply_async(calculate, args=(i,)))# add sentinels to let threads know they're donefor i in range(number_of_threads_you_started): result_queue.put(None)
这是您 需要 使生产者和消费者大致保持平衡的事情,而且任何标准库中都没有什么可以神奇地为您做到的。
编辑-充实它
这是一个完整的可执行示例,任何使用Python3的人都可以运行。笔记:
- 它不使用您的代码片段,因为这些代码片段依赖于外部数据库模块,并非每个人都可以运行。
- 它坚持要
concurrent.futures
同时管理进程和线程。它实际上multiprocessing
并没有那么难使用threading
,实际上,这里使用线程的 方式 直接使用起来会容易一些threading
。但是这种方式很明显。 - 一个
concurrent.futures
Future
对象基本上是同样的事情作为一个multiprocessing
异步结果对象- API的功能只是拼写不同。 - 您的问题并不简单,因为它具有可以以不同速度运行的多个阶段。同样,任何标准库中的任何事物都无法通过魔术掩盖潜在的不良后果。创建自己的有界队列仍然是最好的解决方案。对于任何合理的值,此处的内存使用量将保持适中
MAX_QUEUE_SIZE
。 - 通常 ,您所希望创建的CPU约束工作进程的数量少于可使用的内核数量。主程序还需要运行周期,操作系统也需要运行周期。
- 一旦习惯了这些内容,此代码中的所有注释都会令人讨厌,就像在代码行上看到注释“以1递增”
i += 1
;-)
这是代码:
import concurrent.futures as cfimport threadingimport queueNUM_CPUS = 3NUM_THREADS = 4MAX_QUEUE_SIZE = 20# Runs in worker processes.def producer(i): return i + 10def consumer(i): global total # We need to protect this with a lock because # multiple threads in the main program can # execute this function simultaneously. with sumlock: total += i# Runs in threads in main program.def consume_results(q): while True: future = q.get() if future is None: break else: consumer(future.result())if __name__ == "__main__": sumlock = threading.Lock() result_queue = queue.Queue(MAX_QUEUE_SIZE) total = 0 NUM_TO_DO = 1000 with cf.ThreadPoolExecutor(NUM_THREADS) as tp: # start the threads running `consume_results` for _ in range(NUM_THREADS): tp.submit(consume_results, result_queue) # start the worker processes with cf.ProcessPoolExecutor(NUM_CPUS) as pp: for i in range(NUM_TO_DO): # blocks until the queue size <= MAX_QUEUE_SIZE result_queue.put(pp.submit(producer, i)) # tell threads we're done for _ in range(NUM_THREADS): result_queue.put(None) print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2)如果一切顺利,这是预期的输出:
got 509500 expected 509500



