我认为解决此问题的一种好方法是创建一个装饰器并使用该
Thread.join(timeout=seconds)方法。请记住,没有杀死线程的好方法,因此只要您的程序正在运行,它就会或多或少在后台继续运行。
首先,创建一个这样的装饰器:
from threading import Threadimport functoolsdef timeout(timeout): def deco(func): @functools.wraps(func) def wrapper(*args, **kwargs): res = [Exception('function [%s] timeout [%s seconds] exceeded!' % (func.__name__, timeout))] def newFunc(): try: res[0] = func(*args, **kwargs) except Exception as e: res[0] = e t = Thread(target=newFunc) t.daemon = True try: t.start() t.join(timeout) except Exception as je: print ('error starting thread') raise je ret = res[0] if isinstance(ret, baseException): raise ret return ret return wrapper return deco然后,执行以下操作:
func = timeout(timeout=16)(MyModule.MyFunc)try: func()except: pass #handle errors here
您可以在需要的任何地方使用此装饰器,例如:
@timeout(60)def f(): ...



