栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

如何在Python中的函数中添加超时

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

如何在Python中的函数中添加超时

这个问题是在9年前提出的,从那以后,Python和我的经验清单都发生了很大变化。在查看了标准库中的其他API并希望部分复制其中一个API之后,编写了follow模块以达到与问题中发布的API相似的目的。

异步

#! /usr/bin/env python3import _threadimport abc as _abcimport collections as _collectionsimport enum as _enumimport math as _mathimport multiprocessing as _multiprocessingimport operator as _operatorimport queue as _queueimport signal as _signalimport sys as _sysimport time as _time__all__ = (    'Executor',    'get_timeout',    'set_timeout',    'submit',    'map_',    'shutdown')class _base(metaclass=_abc.ABCmeta):    __slots__ = (        '__timeout',    )    @_abc.abstractmethod    def __init__(self, timeout):        self.timeout = _math.inf if timeout is None else timeout    def get_timeout(self):        return self.__timeout    def set_timeout(self, value):        if not isinstance(value, (float, int)): raise TypeError('value must be of type float or int')        if value <= 0: raise ValueError('value must be greater than zero')        self.__timeout = value    timeout = property(get_timeout, set_timeout)def _run_and_catch(fn, args, kwargs):    # noinspection PyPep8,PyBroadException    try:        return False, fn(*args, **kwargs)    except:        return True, _sys.exc_info()[1]def _run(fn, args, kwargs, queue):    queue.put_nowait(_run_and_catch(fn, args, kwargs))class _State(_enum.IntEnum):    PENDING = _enum.auto()    RUNNING = _enum.auto()    CANCELLED = _enum.auto()    FINISHED = _enum.auto()    ERROR = _enum.auto()def _run_and_catch_loop(iterable, *args, **kwargs):    exception = None    for fn in iterable:        error, value = _run_and_catch(fn, args, kwargs)        if error: exception = value    if exception:        raise exceptionclass _Future(_base):    __slots__ = (        '__queue',        '__process',        '__start_time',        '__callbacks',        '__result',        '__mutex'    )    def __init__(self, timeout, fn, args, kwargs):        super().__init__(timeout)        self.__queue = _multiprocessing.Queue(1)        self.__process = _multiprocessing.Process( target=_run, args=(fn, args, kwargs, self.__queue), daemon=True        )        self.__start_time = _math.inf        self.__callbacks = _collections.deque()        self.__result = True, TimeoutError()        self.__mutex = _thread.allocate_lock()    @property    def __state(self):        pid, exitpre = self.__process.pid, self.__process.exitpre        return (_State.PENDING if pid is None else     _State.RUNNING if exitpre is None else     _State.CANCELLED if exitpre == -_signal.SIGTERM else     _State.FINISHED if exitpre == 0 else     _State.ERROR)    def __repr__(self):        root = f'{type(self).__name__} at {id(self)} state={self.__state.name}'        if self.__state < _State.CANCELLED: return f'<{root}>'        error, value = self.__result        suffix = f'{"raised" if error else "returned"} {type(value).__name__}'        return f'<{root} {suffix}>'    def __consume_callbacks(self):        while self.__callbacks: yield self.__callbacks.popleft()    def __invoke_callbacks(self):        self.__process.join()        _run_and_catch_loop(self.__consume_callbacks(), self)    def cancel(self):        self.__process.terminate()        self.__invoke_callbacks()    def __auto_cancel(self):        elapsed_time = _time.perf_counter() - self.__start_time        if elapsed_time > self.timeout: self.cancel()        return elapsed_time    def cancelled(self):        self.__auto_cancel()        return self.__state is _State.CANCELLED    def running(self):        self.__auto_cancel()        return self.__state is _State.RUNNING    def done(self):        self.__auto_cancel()        return self.__state > _State.RUNNING    def __handle_result(self, error, value):        self.__result = error, value        self.__invoke_callbacks()    def __ensure_termination(self):        with self.__mutex: elapsed_time = self.__auto_cancel() if not self.__queue.empty():     self.__handle_result(*self.__queue.get_nowait()) elif self.__state < _State.CANCELLED:     remaining_time = self.timeout - elapsed_time     if remaining_time == _math.inf:         remaining_time = None     try:         result = self.__queue.get(True, remaining_time)     except _queue.Empty:         self.cancel()     else:         self.__handle_result(*result)    def result(self):        self.__ensure_termination()        error, value = self.__result        if error: raise value        return value    def exception(self):        self.__ensure_termination()        error, value = self.__result        if error: return value    def add_done_callback(self, fn):        if self.done(): fn(self)        else: self.__callbacks.append(fn)    def _set_running_or_notify_cancel(self):        if self.__state is _State.PENDING: self.__process.start() self.__start_time = _time.perf_counter()        else: self.cancel()class Executor(_base):    __slots__ = (        '__futures',    )    def __init__(self, timeout=None):        super().__init__(timeout)        self.__futures = set()    def submit(self, fn, *args, **kwargs):        future = _Future(self.timeout, fn, args, kwargs)        self.__futures.add(future)        future.add_done_callback(self.__futures.remove)        # noinspection PyProtectedMember        future._set_running_or_notify_cancel()        return future    @staticmethod    def __cancel_futures(iterable):        _run_and_catch_loop(map(_operator.attrgetter('cancel'), iterable))    def map(self, fn, *iterables):        futures = tuple(self.submit(fn, *args) for args in zip(*iterables))        def result_iterator(): future_iterator = iter(futures) try:     for future in future_iterator:         yield future.result() finally:     self.__cancel_futures(future_iterator)        return result_iterator()    def shutdown(self):        self.__cancel_futures(frozenset(self.__futures))    def __enter__(self):        return self    def __exit__(self, exc_type, exc_val, exc_tb):        self.shutdown()        return False_executor = Executor()get_timeout = _executor.get_timeoutset_timeout = _executor.set_timeoutsubmit = _executor.submitmap_ = _executor.mapshutdown = _executor.shutdowndel _executor


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651286.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号