栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

带有迭代器的多处理池

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

带有迭代器的多处理池

如果我不得不猜测代码的主要问题,那是因为将您的代码传递

input_rows
给了流程函数
insert()
-
multiprocessing.Pool.apply_async()
工作方式是解压缩传递给它的参数,因此您的
insert()
函数实际上是获取
100
参数,而不是列出一个参数的
100
元素。这会在过程功能甚至没有机会启动之前立即导致错误。如果更改
pool.apply_async(insert,[input_rows])
对它的调用,它可能会开始起作用…您也将违反迭代器的目的,您可能会将整个输入迭代器转换为一个列表,并馈给
100
to的切片
multiprocessing.Pool.map()
并使用它完成。

但是您问是否有一种“更好”的方法。尽管“更好”是一个相对类别,但在理想情况下,

multiprocessing.Pool
它提供了一种方便的
imap()
(和
imap_unordered()
)方法来使用可迭代对象,并以懒惰的方式将其散布到选定的池中(因此在处理之前不会在整个迭代器上运行),因此您需要构建的只是迭代器切片,并将其传递给它进行处理,即:

import arcpyimport itertoolsimport multiprocessing# a utility function to get us a slice of an iterator, as an iterator# when working with iterators maximum lazyness is preferred def iterator_slice(iterator, length):    iterator = iter(iterator)    while True:        res = tuple(itertools.islice(iterator, length))        if not res: break        yield resdef insert(rows):    with arcpy.da.InsertCursor("c:temp2.gdbtest" fields=["*"]) as i_cursor:        for row in rows: i_cursor.insertRow(row)if __name__ == "__main__":  # guard for multi-platform use    with arcpy.da.SearchCursor("c:temp.gdbtest", fields=["*"]) as s_cursor:        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers        for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)): pass  # do whatever you want with your result (return from your process function)        pool.close()  # all done, close cleanly

(顺便说一句,对于所有

s_cursor
不是100的倍数的尺寸,您的代码都不会给出最后一个切片)

但是…如果它确实如广告所宣传的那样,那就太好了。尽管多年来已经修复了很多问题,但在生成自己的迭代器时

imap_unordered()
仍会抽取
大量 的迭代器 样本
(远远大于实际池进程的数量),因此,如果您对此感到担忧,则必须放弃并且弄脏自己,并且您处在正确的轨道上—这
apply_async()
是您想要控制如何喂食池的方法,您只需要确保正确
喂食 池即可:


if __name__ == "__main__":    with arcpy.da.SearchCursor("c:temp.gdbtest", fields=["*"]) as s_cursor:        pool = multiprocessing.Pool(processes=4)  # lets use 4 workers        cursor_iterator = iterator_slice(s_cursor, 100)  # slicer from above, for convinience        queue = []  # a queue for our current worker async results, a deque would be faster        while cursor_iterator or queue:  # while we have anything to do... try:     # add our next slice to the pool:     queue.append(pool.apply_async(insert, [next(cursor_iterator)]))  except (StopIteration, TypeError):  # no more data, clear out the slice iterator     cursor_iterator = None # wait for a free worker or until all remaining finish while queue and (len(queue) >= pool._processes or not cursor_iterator):     process = queue.pop(0)  # grab a process response from the top     process.wait(0.1)  # let it breathe a little, 100ms should be enough     if not process.ready():  # a sub-process has not finished execution         queue.append(process)  # add it back to the queue     else:         # you can use process.get() to get the result if needed         pass        pool.close()

现在,

s_cursor
仅当需要下一个100个结果时(无论
insert()
流程函数是否干净退出),才调用迭代器。

更新 -如果需要捕获的结果,则先前发布的代码最后在关闭队列中存在一个错误,此代码应能很好地完成工作。我们可以使用一些模拟功能轻松地对其进行测试:

import randomimport time# just an example generator to prove lazy access by printing when it generatesdef get_counter(limit=100):    for i in range(limit):        if not i % 3:  # print every third generation to reduce verbosity print("Generated: {}".format(i))        yield i# our process function, just prints what's passed to it and waits for 1-6 secondsdef test_process(values):    time_to_wait = 1 + random.random() * 5    print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))    time.sleep(time_to_wait)    print("Processed: {}".format(values))

现在我们可以像这样缠绕它们:

if __name__ == "__main__":    pool = multiprocessing.Pool(processes=2)  # lets use just 2 workers    count = get_counter(30)  # get our counter iterator set to iterate from 0-29    count_iterator = iterator_slice(count, 7)  # we'll process them in chunks of 7    queue = []  # a queue for our current worker async results, a deque would be faster    while count_iterator or queue:        try: # add our next slice to the pool: queue.append(pool.apply_async(test_process, [next(count_iterator)]))        except (StopIteration, TypeError):  # no more data, clear out the slice iterator count_iterator = None        # wait for a free worker or until all remaining workers finish        while queue and (len(queue) >= pool._processes or not count_iterator): process = queue.pop(0)  # grab a process response from the top process.wait(0.1)  # let it breathe a little, 100ms should be enough if not process.ready():  # a sub-process has not finished execution     queue.append(process)  # add it back to the queue else:     # you can use process.get() to get the result if needed     pass    pool.close()

结果是(当然,系统之间会有所不同):

Generated: 0Generated: 3Generated: 6Generated: 9Generated: 12Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 secondsProcessing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 secondsProcessed: (7, 8, 9, 10, 11, 12, 13)Generated: 15Generated: 18Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 secondsProcessed: (0, 1, 2, 3, 4, 5, 6)Generated: 21Generated: 24Generated: 27Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 secondsProcessed: (14, 15, 16, 17, 18, 19, 20)Processing: (28, 29), waiting: 3.14 secondsProcessed: (21, 22, 23, 24, 25, 26, 27)Processed: (28, 29)

证明我们的生成器/迭代器仅在池中有可用插槽进行工作时才可收集数据,以确保最小的内存使用(和/或如果迭代器最终做到了这一点,则I /
O繁重)。您不会比这更简化。您可以获得的唯一的(尽管是微不足道的)加速是减少等待时间(但是您的主进程将消耗更多的资源)并增加允许的

queue
大小(以内存为代价),该大小被锁定到进程数在上面的代码中-
如果使用
while queue and (len(queue) >= pool._processes + 3 or notcount_iterator):
它,它将加载3个以上的迭代器片,以确保在进程结束且池中的插槽释放时的延迟减少。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/645117.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号