栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

如何使用asyncio和current.futures.ProcessPoolExecutor终止Python中长时间运行的计算(受CPU约束的任务)?

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何使用asyncio和current.futures.ProcessPoolExecutor终止Python中长时间运行的计算(受CPU约束的任务)?

如何在方法中终止长时间运行的CPU绑定计算?

您尝试的方法不起作用,因为所返回的期货

ProcessPoolExecutor
无法取消。虽然ASYNCIO的
run_in_executor

尝试传播的取消,这只不过是忽略由
Future.cancel
一次任务开始执行。

没有任何根本原因。与线程不同,可以安全地终止进程,因此完全有可能

ProcessPoolExecutor.submit
返回
cancel
终止相应进程的将来。Asyncio协程定义了取消语义,并会自动使用它。不幸的是,
ProcessPoolExecutor.submit
返回一个常规
concurrent.futures.Future
,该常规假定最低公分母,并且将运行中的未来视为不可触及。

结果,要取消在子流程中执行的任务,必须

ProcessPoolExecutor
完全规避并管理自己的流程。面临的挑战是如何在不重新实现一半的情况下执行此操作
multiprocessing
。标准库提供的一个选项就是
multiprocessing.Pool
为此目的(滥用),因为它支持可靠地关闭工作进程。A
CancellablePool
可以按以下方式工作:

  • 与其生成固定数量的进程,不如生成固定数量的1-worker池。
  • 从异步协程向池分配任务。如果在等待另一个进程中的任务完成时取消协程,请终止单进程池并创建一个新池。
  • 由于所有内容都是由单个asyncio线程协调的,因此不必担心争用情况,例如意外杀死已经开始执行另一任务的进程。(如果要支持中的取消,则需要防止这种情况
    ProcessPoolExecutor
    。)

这是该想法的示例实现:

import asyncioimport multiprocessingclass CancellablePool:    def __init__(self, max_workers=3):        self._free = {self._new_pool() for _ in range(max_workers)}        self._working = set()        self._change = asyncio.Event()    def _new_pool(self):        return multiprocessing.Pool(1)    async def apply(self, fn, *args):        """        Like multiprocessing.Pool.apply_async, but:         * is an asyncio coroutine         * terminates the process if cancelled        """        while not self._free: await self._change.wait() self._change.clear()        pool = usable_pool = self._free.pop()        self._working.add(pool)        loop = asyncio.get_event_loop()        fut = loop.create_future()        def _on_done(obj): loop.call_soon_threadsafe(fut.set_result, obj)        def _on_err(err): loop.call_soon_threadsafe(fut.set_exception, err)        pool.apply_async(fn, args, callback=_on_done, error_callback=_on_err)        try: return await fut        except asyncio.CancelledError: pool.terminate() usable_pool = self._new_pool()        finally: self._working.remove(pool) self._free.add(usable_pool) self._change.set()    def shutdown(self):        for p in self._working | self._free: p.terminate()        self._free.clear()

显示取消的简约测试用例:

def really_long_process():    print("I am a really long computation.....")    large_val = 9729379273492397293479237492734 ** 344323    print("I finally computed this large value: {}".format(large_val))async def main():    loop = asyncio.get_event_loop()    pool = CancellablePool()    tasks = [loop.create_task(pool.apply(really_long_process))  for _ in range(5)]    for t in tasks:        try: await asyncio.wait_for(t, 1)        except asyncio.TimeoutError: print('task timed out and cancelled')    pool.shutdown()asyncio.get_event_loop().run_until_complete(main())

请注意,CPU使用率从未超过3个内核,并且在测试即将结束时它如何开始下降,表明进程已按预期终止。

要将其应用于问题的代码,请创建

self._lmz_executor
一个实例
CancellablePool
并将其更改
self._loop.run_in_executor(...)
self._loop.create_task(self._lmz_executor.apply(...))



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/625304.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号