栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

有序打印的Python多处理子流程?

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

有序打印的Python多处理子流程?

如您所知,在这种情况下使用锁会杀死多进程,因为您实际上会让所有进程都等待当前持有STDOUT的“权限”的进程释放互斥量。但是,从逻辑上讲,并行运行并与功能/子流程同步打印是排他的。

相反,您可以做的是让主流程充当子流程的“打印机”,这样子流程一旦完成/出现错误,然后然后才将要打印的内容发送回主流程。对于似乎不是“实时”的打印,您似乎是非常满意的(无论如何也不能如前所述),因此该方法应该恰好为您服务。所以:

import multiprocessing as mpimport random  # just to add some randomnessfrom time import sleepdef foo(x):    output = ["[Process {}]: foo:".format(x)]    for i in range(5):        output.append('[Process {}] in foo {}'.format(x, i))        sleep(0.2 + 1 * random.random())    return "n".join(output)if __name__ == '__main__':    pool = mp.Pool(4)    for res in pool.imap_unordered(foo, range(4)):        print("[MAIN]: Process finished, response:")        print(res)  # this will print as soon as one of the processes finishes/errors    pool.close()

这会给你(当然是YMMV):

[MAIN]: Process finished, response:[Process 2]: foo:[Process 2] in foo 0[Process 2] in foo 1[Process 2] in foo 2[Process 2] in foo 3[Process 2] in foo 4[MAIN]: Process finished, response:[Process 0]: foo:[Process 0] in foo 0[Process 0] in foo 1[Process 0] in foo 2[Process 0] in foo 3[Process 0] in foo 4[MAIN]: Process finished, response:[Process 1]: foo:[Process 1] in foo 0[Process 1] in foo 1[Process 1] in foo 2[Process 1] in foo 3[Process 1] in foo 4[MAIN]: Process finished, response:[Process 3]: foo:[Process 3] in foo 0[Process 3] in foo 1[Process 3] in foo 2[Process 3] in foo 3[Process 3] in foo 4

您可以观察到其他任何东西,包括以同样的方式出现错误。

更新 -如果您绝对必须使用其输出无法控制的功能,则可以包装子流程并捕获其STDOUT /
STDERR,然后一旦完成(或引发异常),就可以将所有内容返回给流程’manager ‘打印到实际的STDOUT。通过这样的设置,我们可以

foo()
像:

def foo(x):    print("[Process {}]: foo:".format(x))    for i in range(5):        print('[Process {}] in foo {}'.format(x, i))        sleep(0.2 + 1 * random.random())        if random.random() < 0.0625:  # let's add a 1/4 chance to err: raise Exception("[Process {}] A random exception is random!".format(x))    return random.random() * 100  # just a random response, you can omit it

请注意,它非常高兴地不知道有什么东西试图破坏其操作模式。然后,我们将创建一个外部通用包装器(这样您就不必依赖于功能对其进行更改)实际上就 弄乱
了它的默认行为(不仅是这个函数,还有它在运行时可能调用的所有其他东西):

def std_wrapper(args):    try:        from StringIO import StringIO  # ... for Python 2.x compatibility    except importError:        from io import StringIO    import sys    sys.stdout, sys.stderr = StringIO(), StringIO()  # replace stdout/err with our buffers    # args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:    process_name = args[0]    process_args = args[1] if len(args) > 1 else []    process_kwargs = args[2] if len(args) > 2 else {}    # get our method from its name, assuming global namespace of the current module/script    process = globals()[process_name]    response = None  # in case a call fails    try:        response = process(*process_args, **process_kwargs)  # call our process function    except Exception as e:  # too broad but good enough as an example        print(e)    # rewind our buffers:    sys.stdout.seek(0)    sys.stderr.seek(0)    # return everything packed as STDOUT, STDERR, PROCESS_RESPonSE | NONE    return sys.stdout.read(), sys.stderr.read(), response

现在,我们需要的是调用此包装器而不是所需的包装器

foo()
,并为其提供有关代表我们调用的信息:

if __name__ == '__main__':    pool = mp.Pool(4)    # since we're wrapping the process we're calling, we need to send to the wrapper packed    # data with instructions on what to call on our behalf.    # info on args packing available in the std_wrapper function above.    for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):        print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))        print(out.rstrip())  # remove the trailing space for niceness, print err if you want    pool.close()

因此,如果您现在运行它,将会得到以下内容:

[MAIN]: Process finished, response: None, STDOUT:[Process 2]: foo:[Process 2] in foo 0[Process 2] in foo 1[Process 2] A random exception is random![MAIN]: Process finished, response: 87.9658471743586, STDOUT:[Process 1]: foo:[Process 1] in foo 0[Process 1] in foo 1[Process 1] in foo 2[Process 1] in foo 3[Process 1] in foo 4[MAIN]: Process finished, response: 38.929554421661194, STDOUT:[Process 3]: foo:[Process 3] in foo 0[Process 3] in foo 1[Process 3] in foo 2[Process 3] in foo 3[Process 3] in foo 4[MAIN]: Process finished, response: None, STDOUT:[Process 0]: foo:[Process 0] in foo 0[Process 0] in foo 1[Process 0] in foo 2[Process 0] in foo 3[Process 0] in foo 4[Process 0] A random exception is random!

尽管

foo()
只是打印掉或出错。当然,您可以使用此类包装器来调用任何函数,并将任何数量的args / kwargs传递给该函数。

更新#2- 但是,等一下!如果我们可以像这样包装我们的函数过程,并捕获它们的STDOUT /
STDERR,我们肯定可以将其变成装饰器,并通过简单的装饰将其用于代码中。因此,对于我的最终建议:

import functoolsimport multiprocessingimport random  # just to add some randomnessimport timedef std_wrapper(func):    @functools.wraps(func)  # we need this to unravel the target function name    def caller(*args, **kwargs):  # and now for the wrapper, nothing new here        try: from StringIO import StringIO  # ... for Python 2.x compatibility        except importError: from io import StringIO        import sys        sys.stdout, sys.stderr = StringIO(), StringIO()  # use our buffers instead        response = None  # in case a call fails        try: response = func(*args, **kwargs)  # call our wrapped process function        except Exception as e:  # too broad but good enough as an example print(e)  # NOTE: the exception is also printed to the captured STDOUT        # rewind our buffers:        sys.stdout.seek(0)        sys.stderr.seek(0)        # return everything packed as STDOUT, STDERR, PROCESS_RESPonSE | NONE        return sys.stdout.read(), sys.stderr.read(), response    return caller@std_wrapper  # decorate any function, it won't know you're siphoning its STDOUT/STDERRdef foo(x):    print("[Process {}]: foo:".format(x))    for i in range(5):        print('[Process {}] in foo {}'.format(x, i))        time.sleep(0.2 + 1 * random.random())        if random.random() < 0.0625:  # let's add a 1/4 chance to err: raise Exception("[Process {}] A random exception is random!".format(x))    return random.random() * 100  # just a random response, you can omit it

现在我们可以像以前一样调用包装函数了,而无需处理参数打包或任何类似的事情,因此我们回到了:

if __name__ == '__main__':    pool = multiprocessing.Pool(4)    for out, err, res in pool.imap_unordered(foo, range(4)):        print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))        print(out.rstrip())  # remove the trailing space for niceness, print err if you want    pool.close()

输出与前面的示例相同,但包装更好,更易于管理。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/661127.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号