signal的文档中描述了用于使操作超时的过程。
基本思想是使用信号处理程序在某个时间间隔内设置警报,并在计时器到期后引发异常。
请注意,这仅适用于UNIX。
这是一个创建装饰器的实现(将以下代码另存为
timeout.py)。
from functools import wrapsimport errnoimport osimport signalclass TimeoutError(Exception): passdef timeout(seconds=10, error_message=os.strerror(errno.ETIME)): def decorator(func): def _handle_timeout(signum, frame): raise TimeoutError(error_message) def wrapper(*args, **kwargs): signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(seconds) try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return wraps(func)(wrapper) return decorator
这将创建一个称为的装饰器
@timeout,该装饰器可应用于任何长时间运行的函数。
因此,在你的应用程序代码中,你可以像这样使用装饰器:
from timeout import timeout# Timeout a long running function with the default expiry of 10 seconds.@timeoutdef long_running_function1(): ...# Timeout after 5 seconds@timeout(5)def long_running_function2(): ...# Timeout after 30 seconds, with the error "Connection timed out"@timeout(30, os.strerror(errno.ETIMEDOUT))def long_running_function3(): ...



