栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

熊猫df.iterrows()并行化

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

熊猫df.iterrows()并行化

就像@Khris在他的评论中说的那样,您应该将数据帧分成几个大块,并并行地遍历每个块。您可以将数据帧任意分成随机大小的块,但是根据您计划使用的进程数将数据帧分成大小相等的块更有意义。幸运的是,已经有人想出了如何为我们做这部分工作:

# don't forget to importimport pandas as pdimport multiprocessing# create as many processes as there are CPUs on your machinenum_processes = multiprocessing.cpu_count()# calculate the chunk size as an integerchunk_size = int(df.shape[0]/num_processes)# this solution was reworked from the above link.# will work even if the length of the dataframe is not evenly divisible by num_processeschunks = [df.ix[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]

这将创建一个列表,其中包含我们的数据框。现在,我们需要将其与将操纵数据的函数一起传递到池中。

def func(d):   # let's create a function that squares every value in the dataframe   return d * d# create our pool with `num_processes` processespool = multiprocessing.Pool(processes=num_processes)# apply our function to each chunk in the listresult = pool.map(func, chunks)

此时,

result
将是一个列表,其中包含每个已被操作的块。在这种情况下,所有值均已平方。现在的问题是原始数据框尚未修改,因此我们必须用池中的结果替换其所有现有值。

for i in range(len(result)):   # since result[i] is just a dataframe   # we can reassign the original dataframe based on the index of each chunk   df.ix[result[i].index] = result[i]

现在,我对数据框进行操作的功能已经过矢量化处理,如果我将其简单地应用于整个数据框而不是拆分成块的话,可能会更快。但是,在您的情况下,您的函数将遍历每个块的每一行,然后返回该块。这使您可以一次处理

num_process
行。

def func(d):   for row in d.iterrow():      idx = row[0]      k = row[1]['Chromosome']      start,end = row[1]['Bin'].split('-')      sequence = sequence_from_coordinates(k,1,start,end) #slow download form http      d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))      d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))      d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))   # return the chunk!   return d

然后,您可以在原始数据帧中重新分配值,并成功并行化了此过程。

我应该使用多少个进程?

您的最佳性能将取决于此问题的答案。而“所有过程!!!!”
是一个答案,更好的答案更细微。在某一点之后,在一个问题上投入更多的流程实际上会产生超出其价值的开销。这就是阿姆达尔定律。再次,我们很幸运
一个很好的默认值是使用

multiprocessing.cpu_count()
,这是的默认行为
multiprocessing.Pool

根据文档“如果进程为None,则使用cpu_count()返回的数字。”
这就是为什么我一
num_processes
开始就设置为
multiprocessing.cpu_count()
。这样,如果您使用功能更强大的机器,则无需
num_processes
直接更改变量即可从中受益。




转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/638793.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号