很好的问题,这是关于何时将数据移至群集并向下移至客户端(您的python会话)的几点。让我们看一下计算的几个阶段
用熊猫加载数据
这是python会话中的Pandas数据框,因此显然仍在本地进程中。
log = pd.read_csv('800000test', sep='t') # on client转换为惰性Dask.dataframe
这会将您的Pandas数据帧分解为20个Pandas数据帧,但是这些仍在客户端上。Dask数据帧不会急于将数据发送到群集。
logd = dd.from_pandas(log,npartitions=20) # still on client
计算len
调用
len实际上会在此处引起计算(通常您会使用
df.some_aggregation().compute()。因此,Dask开始了。首先将数据移出群集(慢速),然后在20个分区的所有分区上调用len(快速),将其聚合(快速),然后然后将结果下移到您的客户端,以便可以打印。
print(len(logd)) # costly roundtrip client -> cluster -> client
分析
所以这里的问题是我们的dask.dataframe仍然在本地python会话中拥有所有数据。
例如,使用本地线程调度程序比使用分布式调度程序要快得多。这应该以毫秒为单位计算
with dask.set_options(get=dask.threaded.get): # no cluster, just local threads print(len(logd)) # stays on client
但是想必您想知道如何扩展到更大的数据集,因此以正确的方式进行操作。
将数据加载到工作人员上
不要让Dask工作者在您的客户端/本地会话中加载熊猫,而是加载csv文件的位。这样,无需与客户-工人进行沟通。
# log = pd.read_csv('800000test', sep='t') # on clientlog = dd.read_csv('800000test', sep='t') # on cluster workers但是,与不同
pd.read_csv,它
dd.read_csv是懒惰的,因此这几乎应该立即返回。我们可以强制Dask使用persist方法实际执行计算
log = client.persist(log) # triggers computation asynchronously
现在,集群开始起作用并将您的数据直接加载到工作进程中。这是相对较快的。请注意,在后台进行工作时,此方法会立即返回。如果您要等到完成,请致电
wait。
from dask.distributed import waitwait(log) # blocks until read is done
如果您正在使用较小的数据集进行测试并希望获得更多分区,请尝试更改块大小。
log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks
无论如何,
log现在应该可以快速进行操作
len(log) # fast
编辑
为了回答此博客文章上的问题,以下是我们对文件所在位置的假设。
通常,当您提供文件名时
dd.read_csv,假定该文件对所有工作人员均可见。如果您使用的是网络文件系统或S3或HDFS之类的全局存储,则为true。如果您使用的是网络文件系统,则将要使用绝对路径(例如
/path/to/myfile.*.csv),或者确保您的工作人员和客户端具有相同的工作目录。
如果不是这种情况,并且您的数据仅在您的客户端计算机上,那么您将不得不加载并分散数据。
简单但次优
简单的方法是只做您最初的工作,但是保留dask.dataframe
log = pd.read_csv('800000test', sep='t') # on clientlogd = dd.from_pandas(log,npartitions=20) # still on clientlogd = client.persist(logd) # moves to workers很好,但是导致交流效果不理想。
复杂但最优
相反,您可能会将数据明确分散到群集中
[future] = client.scatter([log])
不过,这会进入更复杂的API,因此我只将您指向docs
http://distributed.readthedocs.io/en/latest/manage-computation.html
http://distributed.readthedocs.io/en/latest/memory.htmlhttp://dask.pydata.org/en/latest/ delay-
collections.html



