栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

使我的NumPy数组跨进程共享

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使我的NumPy数组跨进程共享

请注意,您可以从一个复杂的dtype数组开始:

In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32')

并将其视为同质dtype的数组:

In [5]: data2 = data.view('float32')

然后将其转换回复杂的dtype:

In [7]: data3 = data2.view('float32, (250000,2)float32')

更改dtype是一项非常快速的操作。它不会影响基础数据,只会影响NumPy解释数据的方式。因此,更改dtype实际上是没有成本的。

因此,您可以通过上述技巧将有关具有简单(均质)dtypes的数组的知识轻松应用于您的复杂dtypes。


下面的代码从JF Sebastian的答案中借鉴了许多想法。

import numpy as npimport multiprocessing as mpimport contextlibimport ctypesimport structimport base64def depre(arg):    chunk, counter = arg    print len(chunk), counter    for x in chunk:        peak_counter = 0        data_buff = base64.b64depre(x)        buff_size = len(data_buff) / 4        unpack_format = ">%dL" % buff_size        index = 0        for y in struct.unpack(unpack_format, data_buff): buff1 = struct.pack("I", y) buff2 = struct.unpack("f", buff1)[0] with shared_arr.get_lock():     data = tonumpyarray(shared_arr).view(         [('f0', '<f4'), ('f1', '<f4', (250000, 2))])     if (index % 2 == 0):         data[counter][1][peak_counter][0] = float(buff2)     else:         data[counter][1][peak_counter][1] = float(buff2)         peak_counter += 1 index += 1        counter += 1def pool_init(shared_arr_):    global shared_arr    shared_arr = shared_arr_  # must be inherited, not passed as an argumentdef tonumpyarray(mp_arr):    return np.frombuffer(mp_arr.get_obj())def numpy_array(shared_arr, peaks):    """Fills the NumPy array 'data' with m/z-intensity values acquired    from b64 decoding and unpacking the binary string read from the    mzXML file, which is stored in the list 'peaks'.    The m/z values are assumed to be ordered without validating this    assumption.    Note: This function uses multi-processing    """    processors = mp.cpu_count()    with contextlib.closing(mp.Pool(processes=processors,   initializer=pool_init,   initargs=(shared_arr, ))) as pool:        chunk_size = int(len(peaks) / processors)        map_parameters = []        for i in range(processors): counter = i * chunk_size # WARNING: I removed -1 from (i + 1)*chunk_size, since the right # index is non-inclusive.  chunk = peaks[i*chunk_size : (i + 1)*chunk_size] map_parameters.append((chunk, counter))        pool.map(depre, map_parameters)if __name__ == '__main__':    shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250)    peaks = ...    numpy_array(shared_arr, peaks)

如果您可以保证执行分配的各种过程

if (index % 2 == 0):    data[counter][1][peak_counter][0] = float(buff2)else:    data[counter][1][peak_counter][1] = float(buff2)

从不竞争更改同一位置的数据,那么我相信您实际上可以放弃使用锁

with shared_arr.get_lock():

但是我对您的代码不够了解,无法确定,因此为了安全起见,我添加了锁。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/646785.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号