栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

Python分块CSV文件多处理

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python分块CSV文件多处理

根据评论,我们希望每个进程可以处理10000行的块。这并不是很难做到的。请参阅

iter/islice
下面的食谱。但是,使用问题

pool.map(worker, ten_thousand_row_chunks)

pool.map
将尝试把 所有块 的任务队列一次。如果这需要的内存超过可用内存,那么您将获得一个
MemoryError
。(注意:
pool.imap

遇到相同的问题。)

因此,我们需要对

pool.map
每个块的各个部分进行迭代调用。

import itertools as ITimport multiprocessing as mpimport csvdef worker(chunk):    return len(chunk)def main():    # num_procs is the number of workers in the pool    num_procs = mp.cpu_count()    # chunksize is the number of lines in a chunk    chunksize = 10**5    pool = mp.Pool(num_procs)    largefile = 'Counseling.csv'    results = []    with open(largefile, 'rb') as f:        reader = csv.reader(f)        for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []): chunk = iter(chunk) pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), [])) result = pool.map(worker, pieces) results.extend(result)    print(results)    pool.close()    pool.join()main()

每个文件

chunk
最多包含
chunksize*num_procs
文件中的几行。这些数据足以使池中的所有工作人员都能工作,但又不会太大,以至于引起MemoryError
-如果设置的
chunksize
值没有太大。

每个

chunk
然后被分解成块,其中每个块由多达的
chunksize
行从该文件。然后将这些片段发送到
pool.map


如何

iter(lambda: list(IT.islice(iterator, chunksize)), [])
工作

这是将迭代器分为长度为chunksize的块的惯用法。让我们看一下它如何工作的例子:

In [111]: iterator = iter(range(10))

请注意,每次

IT.islice(iterator, 3)
调用时,都会从迭代器中切出一个新的3个项目块:

In [112]: list(IT.islice(iterator, 3))Out[112]: [0, 1, 2]In [113]: list(IT.islice(iterator, 3))Out[113]: [3, 4, 5]In [114]: list(IT.islice(iterator, 3))Out[114]: [6, 7, 8]

当迭代器中剩余的项目少于3个时,仅返回剩余的内容:

In [115]: list(IT.islice(iterator, 3))Out[115]: [9]

如果再次调用它,则会得到一个空列表:

In [116]: list(IT.islice(iterable, 3))Out[116]: []

lambda: list(IT.islice(iterator, chunksize))
是一个
list(IT.islice(iterator,chunksize))
在调用时返回的函数。这是“单线”,相当于

def func():    return  list(IT.islice(iterator, chunksize))

最后,

iter(callable,sentinel)
返回另一个迭代器。该迭代器产生的值是可调用函数返回的值。它会不断产生值,直到可调用对象返回等于前哨值的值为止。所以

iter(lambda: list(IT.islice(iterator, chunksize)), [])

将继续返回值,

list(IT.islice(iterator, chunksize))
直到该值是空列表为止:

In [121]: iterator = iter(range(10))In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), []))Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/625419.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号