尽管您不应该直接使用,但是您应该能够安全地合并
asyncio并且
multiprocessing没有太多麻烦
multiprocessing。的大罪
asyncio(以及基于异步框架的任何其他事件循环)被阻塞事件循环。如果您尝试
multiprocessing直接使用,则在任何时候阻止等待子进程时,都将阻止事件循环。显然,这很糟糕。
避免这种情况的最简单方法是用于
baseEventLoop.run_in_executor在中执行函数
concurrent.futures.ProcessPoolExecutor。
ProcessPoolExecutor是使用来实现的进程池
multiprocessing.Process,但是
asyncio内置支持在其中执行功能而不阻塞事件循环的功能。这是一个简单的例子:
import timeimport asynciofrom concurrent.futures import ProcessPoolExecutordef blocking_func(x): time.sleep(x) # Pretend this is expensive calculations return x * 5@asyncio.coroutinedef main(): #pool = multiprocessing.Pool() #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop. executor = ProcessPoolExecutor() out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not print(out)if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(main())
对于大多数情况,仅此功能就足够了。如果你发现自己需要其他结构的
multiprocessing,如
Queue,
Event,
Manager,等,有一个所谓的第三方库
aioprocessing(全面披露:我写的),它提供
asyncio所有的兼容版本的
multiprocessing数据结构。这是一个演示示例:
import timeimport asyncioimport aioprocessingimport multiprocessingdef func(queue, event, lock, items): with lock: event.set() for item in items: time.sleep(3) queue.put(item+5) queue.close()@asyncio.coroutinedef example(queue, event, lock): l = [1,2,3,4,5] p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) p.start() while True: result = yield from queue.coro_get() if result is None: break print("Got result {}".format(result)) yield from p.coro_join()@asyncio.coroutinedef example2(queue, event, lock): yield from event.coro_wait() with (yield from lock): yield from queue.coro_put(78) yield from queue.coro_put(None) # Shut down the workerif __name__ == "__main__": loop = asyncio.get_event_loop() queue = aioprocessing.AioQueue() lock = aioprocessing.AioLock() event = aioprocessing.AioEvent() tasks = [ asyncio.async(example(queue, event, lock)), asyncio.async(example2(queue, event, lock)), ] loop.run_until_complete(asyncio.wait(tasks)) loop.close()


