这里有两个问题:首先,如何异步运行阻塞代码,其次,如何并行运行异步代码(异步是单线程的,因此GIL仍然适用,因此不是 真正的 并发,但我离题了。
可以使用asyncio.ensure_future创建并行任务,如此处所述。
要运行同步代码,您将需要在executor中运行阻塞代码。例:
import concurrent.futuresimport asyncioimport timedef blocking(delay): time.sleep(delay) print('Completed.')async def non_blocking(loop, executor): # Run three of the blocking tasks concurrently. asyncio.wait will # automatically wrap these in Tasks. If you want explicit access # to the tasks themselves, use asyncio.ensure_future, or add a # "done, pending = asyncio.wait..." assignment await asyncio.wait( fs={ # Returns after delay=12 seconds loop.run_in_executor(executor, blocking, 12), # Returns after delay=14 seconds loop.run_in_executor(executor, blocking, 14), # Returns after delay=16 seconds loop.run_in_executor(executor, blocking, 16) }, return_when=asyncio.ALL_COMPLETED )loop = asyncio.get_event_loop()executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)loop.run_until_complete(non_blocking(loop, executor))如果要使用for循环调度这些任务(如您的示例中所示),则有几种不同的策略,但是基本方法是使用for循环(或列表理解等) 调度
任务,并通过asyncio等待它们。等待, 然后 检索结果。例:
done, pending = await asyncio.wait( fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps], return_when=asyncio.ALL_COMPLETED)# Note that any errors raise during the above will be raised here; to# handle errors you will need to call task.exception() and check if it# is not None before calling task.result()results = [task.result() for task in done]



