首先,这是一个非常好的问题!在深入研究
multiprocessing代码后,我想我已经找到了一种方法:
当您启动时
multiprocessing.Pool,该
Pool对象会在内部
multiprocessing.Process为池中的每个成员创建一个对象。当这些子流程启动时,它们会调用一个
_bootstrap函数,如下所示:
def _bootstrap(self): from . import util global _current_process try: # ... (stuff we don't care about) util._finalizer_registry.clear() util._run_after_forkers() util.info('child process calling self.run()') try: self.run() exitpre = 0 finally: util._exit_function() # ... (more stuff we don't care about)该
run方法是实际运行给
target您的
Process对象的方法。对于一个
Pool进程,它是一个具有长时间运行的while循环的方法,该循环等待工作项通过内部队列进入。对我们真正有趣的是:被调用
之后 发生了什么。
self.run``util._exit_function()
事实证明,该函数进行了一些清理,听起来很像您要寻找的内容:
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, active_children=active_children, current_process=current_process): # NB: we hold on to references to functions in the arglist due to the # situation described below, where this function is called after this # module's globals are destroyed. global _exiting info('process shutting down') debug('running all "atexit" finalizers with priority >= 0') # Very interesting! _run_finalizers(0)这是的文档字符串
_run_finalizers:
def _run_finalizers(minpriority=None): ''' Run all finalizers whose exit priority is not None and at least minpriority Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. '''
该方法实际上遍历终结器回调的列表并执行它们:
items = [x for x in _finalizer_registry.items() if f(x)]items.sort(reverse=True)for key, finalizer in items: sub_debug('calling %s', finalizer) try: finalizer() except Exception: import traceback traceback.print_exc()完善。那么我们如何进入
_finalizer_registry?有一个未记录的对象
Finalize,
multiprocessing.util该对象负责向注册表添加回调:
class Finalize(object): ''' Class which supports object finalization using weakrefs ''' def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): assert exitpriority is None or type(exitpriority) is int if obj is not None: self._weakref = weakref.ref(obj, self) else: assert exitpriority is not None self._callback = callback self._args = args self._kwargs = kwargs or {} self._key = (exitpriority, _finalizer_counter.next()) self._pid = os.getpid() _finalizer_registry[self._key] = self # That's what we're looking for!好的,因此将它们放到一个示例中:
import multiprocessingfrom multiprocessing.util import Finalizeresource_cm = Noneresource = Noneclass Resource(object): def __init__(self, args): self.args = args def __enter__(self): print("in __enter__ of %s" % multiprocessing.current_process()) return self def __exit__(self, *args, **kwargs): print("in __exit__ of %s" % multiprocessing.current_process())def open_resource(args): return Resource(args)def _worker_init(args): global resource print("calling init") resource_cm = open_resource(args) resource = resource_cm.__enter__() # Register a finalizer Finalize(resource, resource.__exit__, exitpriority=16)def hi(*args): print("we're in the worker")if __name__ == "__main__": pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",)) pool.map(hi, range(pool._processes)) pool.close() pool.join()输出:
calling initin __enter__ of <Process(PoolWorker-1, started daemon)>calling initcalling initin __enter__ of <Process(PoolWorker-2, started daemon)>in __enter__ of <Process(PoolWorker-3, started daemon)>calling initin __enter__ of <Process(PoolWorker-4, started daemon)>we're in the workerwe're in the workerwe're in the workerwe're in the workerin __exit__ of <Process(PoolWorker-1, started daemon)>in __exit__ of <Process(PoolWorker-2, started daemon)>in __exit__ of <Process(PoolWorker-3, started daemon)>in __exit__ of <Process(PoolWorker-4, started daemon)>
如您所见
__exit__,当我们共享资源时,我们所有的工作人员都会被调用
join()。



