栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

上下文管理器和多处理池

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

上下文管理器和多处理池

首先,这是一个非常好的问题!在深入研究

multiprocessing
代码后,我想我已经找到了一种方法:

当您启动时

multiprocessing.Pool
,该
Pool
对象会在内部
multiprocessing.Process
为池中的每个成员创建一个对象。当这些子流程启动时,它们会调用一个
_bootstrap
函数,如下所示:

def _bootstrap(self):    from . import util    global _current_process    try:        # ... (stuff we don't care about)        util._finalizer_registry.clear()        util._run_after_forkers()        util.info('child process calling self.run()')        try: self.run() exitpre = 0         finally: util._exit_function()        # ... (more stuff we don't care about)

run
方法是实际运行给
target
您的
Process
对象的方法。对于一个
Pool
进程,它是一个具有长时间运行的while循环的方法,该循环等待工作项通过内部队列进入。对我们真正有趣的是:被调用
之后 发生了什么。
self.run``util._exit_function()

事实证明,该函数进行了一些清理,听起来很像您要寻找的内容:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,        active_children=active_children,        current_process=current_process):    # NB: we hold on to references to functions in the arglist due to the    # situation described below, where this function is called after this    # module's globals are destroyed.    global _exiting    info('process shutting down')    debug('running all "atexit" finalizers with priority >= 0')  # Very interesting!    _run_finalizers(0)

这是的文档字符串

_run_finalizers

def _run_finalizers(minpriority=None):    '''    Run all finalizers whose exit priority is not None and at least minpriority    Finalizers with highest priority are called first; finalizers with    the same priority will be called in reverse order of creation.    '''

该方法实际上遍历终结器回调的列表并执行它们:

items = [x for x in _finalizer_registry.items() if f(x)]items.sort(reverse=True)for key, finalizer in items:    sub_debug('calling %s', finalizer)    try:        finalizer()    except Exception:        import traceback        traceback.print_exc()

完善。那么我们如何进入

_finalizer_registry
?有一个未记录的对象
Finalize
multiprocessing.util
该对象负责向注册表添加回调:

class Finalize(object):    '''    Class which supports object finalization using weakrefs    '''    def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):        assert exitpriority is None or type(exitpriority) is int        if obj is not None: self._weakref = weakref.ref(obj, self)        else: assert exitpriority is not None        self._callback = callback        self._args = args        self._kwargs = kwargs or {}        self._key = (exitpriority, _finalizer_counter.next())        self._pid = os.getpid()        _finalizer_registry[self._key] = self  # That's what we're looking for!

好的,因此将它们放到一个示例中:

import multiprocessingfrom multiprocessing.util import Finalizeresource_cm = Noneresource = Noneclass Resource(object):    def __init__(self, args):        self.args = args    def __enter__(self):        print("in __enter__ of %s" % multiprocessing.current_process())        return self    def __exit__(self, *args, **kwargs):        print("in __exit__ of %s" % multiprocessing.current_process())def open_resource(args):    return Resource(args)def _worker_init(args):    global resource    print("calling init")    resource_cm = open_resource(args)    resource = resource_cm.__enter__()    # Register a finalizer    Finalize(resource, resource.__exit__, exitpriority=16)def hi(*args):    print("we're in the worker")if __name__ == "__main__":    pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))    pool.map(hi, range(pool._processes))    pool.close()    pool.join()

输出:

calling initin __enter__ of <Process(PoolWorker-1, started daemon)>calling initcalling initin __enter__ of <Process(PoolWorker-2, started daemon)>in __enter__ of <Process(PoolWorker-3, started daemon)>calling initin __enter__ of <Process(PoolWorker-4, started daemon)>we're in the workerwe're in the workerwe're in the workerwe're in the workerin __exit__ of <Process(PoolWorker-1, started daemon)>in __exit__ of <Process(PoolWorker-2, started daemon)>in __exit__ of <Process(PoolWorker-3, started daemon)>in __exit__ of <Process(PoolWorker-4, started daemon)>

如您所见

__exit__
,当我们共享资源时,我们所有的工作人员都会被调用
join()



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/651731.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号