我通常的方法(如果可以使用额外的内存副本)是在一个进程中完成所有IO,然后将其发送到工作线程池中。要将内存映射数组的切片加载到内存中,只需这样做
x =np.array(data[yourslice])(
data[yourslice].copy()实际上并没有这样做,这可能会引起一些混乱。)。
首先,让我们生成一些测试数据:
import numpy as npnp.random.random(10000).tofile('data.dat')您可以使用以下方式重现错误:
import numpy as npimport multiprocessingdef main(): data = np.memmap('data.dat', dtype=np.float, mode='r') pool = multiprocessing.Pool() results = pool.imap(calculation, chunks(data)) results = np.fromiter(results, dtype=np.float)def chunks(data, chunksize=100): """Overly-simple chunker...""" intervals = range(0, data.size, chunksize) + [None] for start, stop in zip(intervals[:-1], intervals[1:]): yield data[start:stop]def calculation(chunk): """Dummy calculation.""" return chunk.mean() - chunk.std()if __name__ == '__main__': main()而且,如果只是改用让步
np.array(data[start:stop]),则可以解决问题:
import numpy as npimport multiprocessingdef main(): data = np.memmap('data.dat', dtype=np.float, mode='r') pool = multiprocessing.Pool() results = pool.imap(calculation, chunks(data)) results = np.fromiter(results, dtype=np.float)def chunks(data, chunksize=100): """Overly-simple chunker...""" intervals = range(0, data.size, chunksize) + [None] for start, stop in zip(intervals[:-1], intervals[1:]): yield np.array(data[start:stop])def calculation(chunk): """Dummy calculation.""" return chunk.mean() - chunk.std()if __name__ == '__main__': main()当然,这确实为每个块制作了一个额外的内存副本。
从长远来看,您可能会发现从映射文件切换到HDF之类的操作会更容易。如果您的数据是多维的,则尤其如此。(我建议
h5py,但是
pyTables如果您的数据是“类似表格的”,那就很好了。)
祝你好运,无论如何!



