栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

dask分布式数据帧上的慢len函数

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

dask分布式数据帧上的慢len函数

很好的问题,这是关于何时将数据移至群集并向下移至客户端(您的python会话)的几点。让我们看一下计算的几个阶段

用熊猫加载数据

这是python会话中的Pandas数据框,因此显然仍在本地进程中。

log = pd.read_csv('800000test', sep='t')  # on client

转换为惰性Dask.dataframe

这会将您的Pandas数据帧分解为20个Pandas数据帧,但是这些仍在客户端上。Dask数据帧不会急于将数据发送到群集。

logd = dd.from_pandas(log,npartitions=20)  # still on client

计算len

调用

len
实际上会在此处引起计算(通常您会使用
df.some_aggregation().compute()
。因此,Dask开始了。首先将数据移出群集(慢速),然后在20个分区的所有分区上调用len(快速),将其聚合(快速),然后然后将结果下移到您的客户端,以便可以打印。

print(len(logd))  # costly roundtrip client -> cluster -> client

分析

所以这里的问题是我们的dask.dataframe仍然在本地python会话中拥有所有数据。

例如,使用本地线程调度程序比使用分布式调度程序要快得多。这应该以毫秒为单位计算

with dask.set_options(get=dask.threaded.get):  # no cluster, just local threads    print(len(logd))  # stays on client

但是想必您想知道如何扩展到更大的数据集,因此以正确的方式进行操作。

将数据加载到工作人员上

不要让Dask工作者在您的客户端/本地会话中加载熊猫,而是加载csv文件的位。这样,无需与客户-工人进行沟通。

# log = pd.read_csv('800000test', sep='t')  # on clientlog = dd.read_csv('800000test', sep='t')    # on cluster workers

但是,与不同

pd.read_csv
,它
dd.read_csv
是懒惰的,因此这几乎应该立即返回。我们可以强制Dask使用persist方法实际执行计算

log = client.persist(log)  # triggers computation asynchronously

现在,集群开始起作用并将您的数据直接加载到工作进程中。这是相对较快的。请注意,在后台进行工作时,此方法会立即返回。如果您要等到完成,请致电

wait

from dask.distributed import waitwait(log)  # blocks until read is done

如果您正在使用较小的数据集进行测试并希望获得更多分区,请尝试更改块大小。

log = dd.read_csv(..., blocksize=1000000)  # 1 MB blocks

无论如何,

log
现在应该可以快速进行操作

len(log)  # fast

编辑

为了回答此博客文章上的问题,以下是我们对文件所在位置的假设。

通常,当您提供文件名时

dd.read_csv
,假定该文件对所有工作人员均可见。如果您使用的是网络文件系统或S3或HDFS之类的全局存储,则为true。如果您使用的是网络文件系统,则将要使用绝对路径(例如
/path/to/myfile.*.csv
),或者确保您的工作人员和客户端具有相同的工作目录。

如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散数据。

简单但次优

简单的方法是只做您最初的工作,但是保留dask.dataframe

log = pd.read_csv('800000test', sep='t')  # on clientlogd = dd.from_pandas(log,npartitions=20)  # still on clientlogd = client.persist(logd)  # moves to workers

很好,但是导致交流效果不理想。

复杂但最优

相反,您可能会将数据明确分散到群集中

[future] = client.scatter([log])

不过,这会进入更复杂的API,因此我只将您指向docs

http://distributed.readthedocs.io/en/latest/manage-computation.html


http://distributed.readthedocs.io/en/latest/memory.htmlhttp://dask.pydata.org/en/latest/ delay-
collections.html



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/626212.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号