如果我不得不猜测代码的主要问题,那是因为将您的代码传递
input_rows给了流程函数
insert()-
multiprocessing.Pool.apply_async()工作方式是解压缩传递给它的参数,因此您的
insert()函数实际上是获取
100参数,而不是列出一个参数的
100元素。这会在过程功能甚至没有机会启动之前立即导致错误。如果更改
pool.apply_async(insert,[input_rows])对它的调用,它可能会开始起作用…您也将违反迭代器的目的,您可能会将整个输入迭代器转换为一个列表,并馈给
100to的切片
multiprocessing.Pool.map()并使用它完成。
但是您问是否有一种“更好”的方法。尽管“更好”是一个相对类别,但在理想情况下,
multiprocessing.Pool它提供了一种方便的
imap()(和
imap_unordered())方法来使用可迭代对象,并以懒惰的方式将其散布到选定的池中(因此在处理之前不会在整个迭代器上运行),因此您需要构建的只是迭代器切片,并将其传递给它进行处理,即:
import arcpyimport itertoolsimport multiprocessing# a utility function to get us a slice of an iterator, as an iterator# when working with iterators maximum lazyness is preferred def iterator_slice(iterator, length): iterator = iter(iterator) while True: res = tuple(itertools.islice(iterator, length)) if not res: break yield resdef insert(rows): with arcpy.da.InsertCursor("c:temp2.gdbtest" fields=["*"]) as i_cursor: for row in rows: i_cursor.insertRow(row)if __name__ == "__main__": # guard for multi-platform use with arcpy.da.SearchCursor("c:temp.gdbtest", fields=["*"]) as s_cursor: pool = multiprocessing.Pool(processes=4) # lets use 4 workers for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)): pass # do whatever you want with your result (return from your process function) pool.close() # all done, close cleanly(顺便说一句,对于所有
s_cursor不是100的倍数的尺寸,您的代码都不会给出最后一个切片)
但是…如果它确实如广告所宣传的那样,那就太好了。尽管多年来已经修复了很多问题,但在生成自己的迭代器时
imap_unordered()仍会抽取
大量 的迭代器 样本
(远远大于实际池进程的数量),因此,如果您对此感到担忧,则必须放弃并且弄脏自己,并且您处在正确的轨道上—这
apply_async()是您想要控制如何喂食池的方法,您只需要确保正确
喂食 池即可:
if __name__ == "__main__": with arcpy.da.SearchCursor("c:temp.gdbtest", fields=["*"]) as s_cursor: pool = multiprocessing.Pool(processes=4) # lets use 4 workers cursor_iterator = iterator_slice(s_cursor, 100) # slicer from above, for convinience queue = [] # a queue for our current worker async results, a deque would be faster while cursor_iterator or queue: # while we have anything to do... try: # add our next slice to the pool: queue.append(pool.apply_async(insert, [next(cursor_iterator)])) except (StopIteration, TypeError): # no more data, clear out the slice iterator cursor_iterator = None # wait for a free worker or until all remaining finish while queue and (len(queue) >= pool._processes or not cursor_iterator): process = queue.pop(0) # grab a process response from the top process.wait(0.1) # let it breathe a little, 100ms should be enough if not process.ready(): # a sub-process has not finished execution queue.append(process) # add it back to the queue else: # you can use process.get() to get the result if needed pass pool.close()现在,
s_cursor仅当需要下一个100个结果时(无论
insert()流程函数是否干净退出),才调用迭代器。
更新 -如果需要捕获的结果,则先前发布的代码最后在关闭队列中存在一个错误,此代码应能很好地完成工作。我们可以使用一些模拟功能轻松地对其进行测试:
import randomimport time# just an example generator to prove lazy access by printing when it generatesdef get_counter(limit=100): for i in range(limit): if not i % 3: # print every third generation to reduce verbosity print("Generated: {}".format(i)) yield i# our process function, just prints what's passed to it and waits for 1-6 secondsdef test_process(values): time_to_wait = 1 + random.random() * 5 print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait)) time.sleep(time_to_wait) print("Processed: {}".format(values))现在我们可以像这样缠绕它们:
if __name__ == "__main__": pool = multiprocessing.Pool(processes=2) # lets use just 2 workers count = get_counter(30) # get our counter iterator set to iterate from 0-29 count_iterator = iterator_slice(count, 7) # we'll process them in chunks of 7 queue = [] # a queue for our current worker async results, a deque would be faster while count_iterator or queue: try: # add our next slice to the pool: queue.append(pool.apply_async(test_process, [next(count_iterator)])) except (StopIteration, TypeError): # no more data, clear out the slice iterator count_iterator = None # wait for a free worker or until all remaining workers finish while queue and (len(queue) >= pool._processes or not count_iterator): process = queue.pop(0) # grab a process response from the top process.wait(0.1) # let it breathe a little, 100ms should be enough if not process.ready(): # a sub-process has not finished execution queue.append(process) # add it back to the queue else: # you can use process.get() to get the result if needed pass pool.close()
结果是(当然,系统之间会有所不同):
Generated: 0Generated: 3Generated: 6Generated: 9Generated: 12Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 secondsProcessing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 secondsProcessed: (7, 8, 9, 10, 11, 12, 13)Generated: 15Generated: 18Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 secondsProcessed: (0, 1, 2, 3, 4, 5, 6)Generated: 21Generated: 24Generated: 27Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 secondsProcessed: (14, 15, 16, 17, 18, 19, 20)Processing: (28, 29), waiting: 3.14 secondsProcessed: (21, 22, 23, 24, 25, 26, 27)Processed: (28, 29)
证明我们的生成器/迭代器仅在池中有可用插槽进行工作时才可收集数据,以确保最小的内存使用(和/或如果迭代器最终做到了这一点,则I /
O繁重)。您不会比这更简化。您可以获得的唯一的(尽管是微不足道的)加速是减少等待时间(但是您的主进程将消耗更多的资源)并增加允许的
queue大小(以内存为代价),该大小被锁定到进程数在上面的代码中-
如果使用
while queue and (len(queue) >= pool._processes + 3 or notcount_iterator):它,它将加载3个以上的迭代器片,以确保在进程结束且池中的插槽释放时的延迟减少。



