我相信,
TypeError来自
multiprocessing的
get。
我已经从脚本中删除了所有数据库代码。看看这个:
import multiprocessingimport sqlalchemy.excdef do(kwargs): i = kwargs['i'] print i raise sqlalchemy.exc.ProgrammingError("", {}, None) return ipool = multiprocessing.Pool(processes=5) # start 4 worker processesresults = []arglist = []for i in range(10): arglist.append({'i':i})r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously# Use get or wait?# r.get()r.wait()pool.close()pool.join()print results使用会
r.wait返回预期结果,但会使用
r.getraises
TypeError。如python文档中所述,请
r.wait在后面使用
map_async。
编辑 :我必须修改我以前的答案。我现在相信这些
TypeError来自SQLAlchemy。我已经修改了脚本以重现该错误。
编辑2
:问题似乎是,
multiprocessing.pool如果任何工作程序引发异常,而该异常的构造函数需要参数(请参见此处),则该问题将无法很好地发挥。
我修改了脚本以突出显示此内容。
import multiprocessingclass BadExc(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.a = aclass GoodExc(Exception): def __init__(self, a=None): '''Optional param in the constructor.''' self.a = adef do(kwargs): i = kwargs['i'] print i raise BadExc('a') # raise GoodExc('a') return ipool = multiprocessing.Pool(processes=5)results = []arglist = []for i in range(10): arglist.append({'i':i})r = pool.map_async(do, arglist, callback=results.append)try: # set a timeout in order to be able to catch C-c r.get(1e100)except KeyboardInterrupt: passprint results在您的情况下,鉴于您的代码引发了SQLAlchemy异常,我想到的唯一解决方案是捕获
do函数中的所有异常,然后重新引发一个普通的异常
Exception。像这样:
import multiprocessingclass BadExc(Exception): def __init__(self, a): '''Non-optional param in the constructor.''' self.a = adef do(kwargs): try: i = kwargs['i'] print i raise BadExc('a') return i except Exception as e: raise Exception(repr(e))pool = multiprocessing.Pool(processes=5)results = []arglist = []for i in range(10): arglist.append({'i':i})r = pool.map_async(do, arglist, callback=results.append)try: # set a timeout in order to be able to catch C-c r.get(1e100)except KeyboardInterrupt: passprint results编辑3
:因此,这似乎是Python的错误,但是SQLAlchemy中的适当异常可以解决该问题:因此,我也提出了SQLAlchemy的问题。
作为解决该问题的方法,我认为 Edit 2 末尾的解决方案可以解决(在try-except中包装回调并重新引发)。



