栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

处理多个进程中的单个文件

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

处理多个进程中的单个文件

您正在寻找的是生产者/消费者模式

基本线程示例

这是使用线程模块的基本示例(而不是多处理)

import threadingimport Queueimport sysdef do_work(in_queue, out_queue):    while True:        item = in_queue.get()        # process        result = item        out_queue.put(result)        in_queue.task_done()if __name__ == "__main__":    work = Queue.Queue()    results = Queue.Queue()    total = 20    # start for workers    for i in xrange(4):        t = threading.Thread(target=do_work, args=(work, results))        t.daemon = True        t.start()    # produce data    for i in xrange(total):        work.put(i)    work.join()    # get the results    for i in xrange(total):        print results.get()    sys.exit()

您不会与线程共享文件对象。您可以通过为队列提供数据行来为他们工作。然后,每个线程将接起一行,对其进行处理,然后将其返回到队列中。

在多处理模块中内置了一些更高级的功能来共享数据,例如列表和特殊的Queue。在使用多处理与线程时需要权衡取舍,这取决于您的工作是CPU约束还是IO约束。

基本的多处理池示例

这是一个多处理池的真正基本示例

from multiprocessing import Pooldef process_line(line):    return "FOO: %s" % lineif __name__ == "__main__":    pool = Pool(4)    with open('file.txt') as source_file:        # chunk the work into batches of 4 lines at a time        results = pool.map(process_line, source_file, 4)    print results

池是管理其自身进程的便捷对象。由于打开的文件可以遍历其行,因此可以将其传递到

pool.map()
,该文件将循环遍历并将行传递给worker函数。映射将阻止并在完成后返回整个结果。请注意,这是一个过于简化的示例,
pool.map()
在进行工作之前,它将立即将整个文件读入内存。如果您希望有大文件,请记住这一点。有更多高级方法可以设计生产者/消费者设置。

手动“池”,具有限制和行重新排序

这是Pool.map的手动示例,但是您可以设置队列大小,以使只按其可以处理的最快速度逐个喂入,而不是一次性消耗整个可迭代对象。我还添加了行号,以便以后可以跟踪它们并引用它们。

from multiprocessing import Process, Managerimport timeimport itertoolsdef do_work(in_queue, out_list):    while True:        item = in_queue.get()        line_no, line = item        # exit signal         if line == None: return        # fake work        time.sleep(.5)        result = (line_no, line)        out_list.append(result)if __name__ == "__main__":    num_workers = 4    manager = Manager()    results = manager.list()    work = manager.Queue(num_workers)    # start for workers        pool = []    for i in xrange(num_workers):        p = Process(target=do_work, args=(work, results))        p.start()        pool.append(p)    # produce data    with open("source.txt") as f:        iters = itertools.chain(f, (None,)*num_workers)        for num_and_line in enumerate(iters): work.put(num_and_line)    for p in pool:        p.join()    # get the results    # example:  [(1, "foo"), (10, "bar"), (0, "start")]    print sorted(results)


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/571153.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号