栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Concurrent.futures使用指南-同时使用线程和处理的简单示例

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Concurrent.futures使用指南-同时使用线程和处理的简单示例

concurrent.futures
有一个简约的API。对于非常简单的问题,它很容易使用,但是您没有非常简单的问题。如果您这样做,那么您已经解决了;-)

您没有显示

multiprocessing.Pool
您编写的任何代码,但这将是一个更有希望的起点-
假设您要解决问题多于希望确认如果仅您切换就必须容易做到的希望到较弱的API ;-)

继续使用的“明显”方法

multiprocessing
是使用
Pool.apply_async()
方法,将异步结果对象放在bounded上
Queue.Queue
,并使主程序中的线程将其从中拉出,
Queue
然后等待结果显示出来。这很容易,但这不是魔术。它解决了您的问题,因为有界
Queues

以不同速度运行的生产者和消费者之间进行调解 规范方法。没有任何内容 可以 直接
concurrent.futures
解决
问题,而这是“大量内存”问题的核心。

# Define global result_queue only in the main program.import Queueresult_queue = Queue.Queue(100)  # pick a reasonable max size based on your problem# Run this in as many threads as you like.def consume_results():    while True:        a = result_queue.get()        if a is None: break        output(a.get())  # `output()` is your function...# main program passes out work, after starting threadsfor i in range(1000):    # the .put() will block so long as the queue is at its max size    result_queue.put(pool.apply_async(calculate, args=(i,)))# add sentinels to let threads know they're donefor i in range(number_of_threads_you_started):    result_queue.put(None)

这是您 需要 使生产者和消费者大致保持平衡的事情,而且任何标准库中都没有什么可以神奇地为您做到的。

编辑-充实它

这是一个完整的可执行示例,任何使用Python3的人都可以运行。笔记:

  • 它不使用您的代码片段,因为这些代码片段依赖于外部数据库模块,并非每个人都可以运行。
  • 它坚持要
    concurrent.futures
    同时管理进程和线程。它实际上
    multiprocessing
    并没有那么难使用
    threading
    ,实际上,这里使用线程的 方式 直接使用起来会容易一些
    threading
    。但是这种方式很明显。
  • 一个
    concurrent.futures
    Future
    对象基本上是同样的事情作为一个
    multiprocessing
    异步结果对象- API的功能只是拼写不同。
  • 您的问题并不简单,因为它具有可以以不同速度运行的多个阶段。同样,任何标准库中的任何事物都无法通过魔术掩盖潜在的不良后果。创建自己的有界队列仍然是最好的解决方案。对于任何合理的值,此处的内存使用量将保持适中
    MAX_QUEUE_SIZE
  • 通常 ,您所希望创建的CPU约束工作进程的数量少于可使用的内核数量。主程序还需要运行周期,操作系统也需要运行周期。
  • 一旦习惯了这些内容,此代码中的所有注释都会令人讨厌,就像在代码行上看到注释“以1递增”
    i += 1
    ;-)

这是代码:

import concurrent.futures as cfimport threadingimport queueNUM_CPUS = 3NUM_THREADS = 4MAX_QUEUE_SIZE = 20# Runs in worker processes.def producer(i):    return i + 10def consumer(i):    global total    # We need to protect this with a lock because    # multiple threads in the main program can    # execute this function simultaneously.    with sumlock:        total += i# Runs in threads in main program.def consume_results(q):    while True:        future = q.get()        if future is None: break        else: consumer(future.result())if __name__ == "__main__":    sumlock = threading.Lock()    result_queue = queue.Queue(MAX_QUEUE_SIZE)    total = 0    NUM_TO_DO = 1000    with cf.ThreadPoolExecutor(NUM_THREADS) as tp:        # start the threads running `consume_results`        for _ in range(NUM_THREADS): tp.submit(consume_results, result_queue)        # start the worker processes        with cf.ProcessPoolExecutor(NUM_CPUS) as pp: for i in range(NUM_TO_DO):     # blocks until the queue size <= MAX_QUEUE_SIZE     result_queue.put(pp.submit(producer, i))        # tell threads we're done        for _ in range(NUM_THREADS): result_queue.put(None)    print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2)

如果一切顺利,这是预期的输出:

got 509500 expected 509500


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/611874.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号