DistGraph Server and Client
Server(含对比)
RPC OPKVStore OP Client(含对比)
RPC OPMultiRPC OPKVStore OPMulti-KVStore OP DistGraphndata
node_attr_schemes_ndata_store_client._data_store_ntype_map_get_ndata_namesnode_split
even or uneven
force_even=Falseforce_even=True DisEmbedding
SparseAdagrad DistTensor
initialization
_name and _part_policyadd as DistGraph featureadd as DistGraph feature with specific policyreference to one that exits多次定义 about shared_memoryMulti-Client and trainers
partition and node_splitabout role in DGLmultiprocessing.Manager Standalone modeHeterogenrous Graph
DistGraph Server and Client Server(含对比)这里面的Server的启动方法直接采用DGL自带的DistGraphServer来启动,看代码
def run_server(graph_name, serv_id, num_servers, num_clients, shared_mem):
from dgl.distributed import DistGraphServer
g = DistGraphServer(server_id=serv_id, ip_config="kv_ip_config.txt",
num_servers = num_servers, num_clients = num_clients,
part_config="dist_graph/{}.json".format(graph_name),
disable_shared_mem=not shared_mem,
graph_format=['csc', 'coo'])
print('start server', serv_id)
g.start()
DistGraphServer为KVServer的子类,在该类里定义了一些backup_server,shared_memory以及load_partition,init_data,差不多类似于之前的,来看这个start函数
def start(self):
""" Start graph store server.
"""
# start server
server_state = ServerState(kv_store=self, local_g=self.client_g, partition_book=self.gpb)
print('start graph service on server {} for part {}'.format(self.server_id, self.part_id))
start_server(server_id=self.server_id,
ip_config=self.ip_config,
num_servers=self.num_servers,
num_clients=self.num_clients, server_state=server_state)
熟悉的配方,都是先ServerState再start_server,通过这样来启动server。
再来看看之前在RPC OP和KVStore OP中的server定义方式。
def start_server(num_client, ip_config, server_id = 0):
import time
print("Sleep 2 seconds to test client re-connect")
time.sleep(2)
server_state = dgl.distributed.ServerState(None, local_g=None, partition_book=None)
dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse)
print("Start server {}".format(server_id))
dgl.distributed.start_server(server_id = server_id,
ip_config = ip_config,
num_servers= 1 ,
num_clients= num_client,
server_state = server_state)
非常简单地使用start_server,ServerState中的参数什么都没定义,因为并没有涉及任何数据库交互的过程,只是单纯传递信息。
KVStore OPdef start_server(server_id, num_clients, num_servers):
import time
print("Sleep 5 seconds to test client re-connect")
time.sleep(5)
kvstore = dgl.distributed.KVServer(server_id = server_id,
ip_config="kv_ip_config.txt",
num_servers = num_servers,
num_clients = num_clients)
kvstore.add_part_policy(node_policy)
kvstore.add_part_policy(edge_policy)
if kvstore.is_backup_server():
kvstore.init_data('data_0', 'node:_N')
kvstore.init_data('data_0_1', 'node:_N')
kvstore.init_data('data_0_2', 'node:_N')
kvstore.init_data('data_0_3', 'node:_N')
else:
kvstore.init_data('data_0', 'node:_N', data_0)
kvstore.init_data('data_0_1', 'node:_N', data_0_1)
kvstore.init_data('data_0_2', 'node:_N', data_0_2)
kvstore.init_data('data_0_3', 'node:_N', data_0_3)
server_state = dgl.distributed.ServerState(kv_store = kvstore, local_g=None, partition_book=None)
dgl.distributed.start_server(server_id = server_id,
ip_config="kv_ip_config.txt",
num_servers = num_servers,
num_clients = num_clients,
server_state = server_state)
定义了数据库并初始化了一些数据库的索引内容,但是没有存图。
Client(含对比)这里使用initialize和DIstGraph来启动一个client
def run_client(graph_name, part_id, num_servers, num_clients, num_nodes, num_edges):
import os
from dgl.distributed import load_partition_book,DistGraph
os.environ['DGL_NUM_SERVER'] = str(num_servers)
dgl.distributed.initialize('kv_ip_config.txt')
gpb, graph_name,_ , _ = load_partition_book('dist_graph/{}.json'.format(graph_name), part_id, None)
print(graph_name)
g= DistGraph(graph_name, gpb)
check_dist_emb(g, num_clients, num_nodes, num_edges)
其中initialize中比较重要的就是使用connect_to_server来连接到Server,此外DistGraph在_init()中会调用get_kvstore().此外还会调用_get_graph_from_shared_mem从server拿到一个graph,这个graph没有feature tensor但是有’inner_node’和’inner_edge’以及node/edge的ID。
另外,这个DistGraph非常重要,单独放在下一节讲。
def start_client(ip_config):
from numpy.testing import assert_array_equal
dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse)
dgl.distributed.connect_to_server(ip_config = ip_config, num_servers=1)
这里面只是使用connect_to_server连接到了Server端,这样就可以通过rpc传递已经注册的Request并接受Response。
MultiRPC OPdef start_client(num_clients, num_servers):
import os
import time
os.environ['DGL_DIST_MODE'] = 'distributed'
dgl.distributed.initialize("kv_ip_config.txt")
kvclient = dgl.distributed.KVClient("kv_ip_config.txt",
num_servers = num_servers)
kvclient.map_shared_data(partition_book=gpb)
这里使用了initialize来连接和KVClient来启动一个KV数据库client。值得注意的是,这里的map_shared_data中的gpb其实是全局变量,等于说这里的shared memory应该是同一台机器的gpb。
Multi-KVStore OPdef start_client_mul_role(i, config):
import os
import time
os.environ['DGL_DIST_MODE'] = 'distributed'
dgl.distributed.initialize(config)
if i==0:
time.sleep(5)
kvclient = dgl.distributed.kvstore.get_kvstore()
kvclient.barrier()
print(kvclient.client_id, kvclient.role, kvclient.machine_id)
print(dgl.distributed.get_num_client())
print("i: %d role: %s" % (i, kvclient.role))
assert dgl.distributed.role.get_num_trainers() == 2
assert dgl.distributed.role.get_trainer_rank() < 2
print('trainer rank: %d, global rank: %d' % (dgl.distributed.role.get_trainer_rank(),
dgl.distributed.role.get_global_rank()))
dgl.distributed.exit_client()
这里通过get_store()来获取initialize中定义初始化的KVClient客户端,每个Client都一个role方法,machine_id以及client_id,这些都在init_role中定义。用以区分Sampler Client和Trainer Client。
DistGraphDistGraph其实就是分布式中的DGL Graph,且运行在Client上。关于它和partition的关系,DGL 这样说道:
DistGraph provides a global view of the distributed graph. Internally,
it may contains a partition of the graph if it is co-located with
the server. When servers and clients run on separate sets of machines,
this returns None.
这是DistGraph的local_partition属性,其返回值也就是self._g,而这个图就是从server端通过_get_graph_from_shared_mem拿到的图。这也再次证明了这里的shared memory应该就是针对同一个机器上的。
此外NodeDataView和EdgeDataView可以帮助我们看到Graph的ndata和edata。
取feature(通过NodeDataView类),例如
g.ndata['feature_name']node_attr_schemes
取node属性scheme(方案),只有名字没有具体feature值
:
# Test write data
new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
g.ndata['test1'][nids] = new_feats
feats = g.ndata['test1'][nids]
assert np.all(F.asnumpy(feats) == 1)
# Test metadata operations.
assert len(g.ndata['features']) == g.number_of_nodes()
assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
assert g.ndata['features'].dtype == F.int64
assert g.node_attr_schemes()['features'].dtype == F.int64
assert g.node_attr_schemes()['test1'].dtype == F.int32
assert g.node_attr_schemes()['features'].shape == (1,)
_ndata_store
g = DistGraph(graph_name, gpb=gpb)
print(g._ndata_store)
for ntype in g._ndata_store:
print(ntype)
print(g._ndata_store[ntype])
结果,会将所有node的feature全部取出来(但是没有命名的不会!)
{'features': }
features
DistTensor直接print不出来,但是可以通过一个1D tensor来取一个切片来print出来。
_client._data_store通过DistGraph所属的KVClient来访问KVStore,能直接返回数据库中的所有data name+feature(包括node,edge,未命名的也会有!,并且可以直接print)
结果
{'node:_N:features':tensor([]....),
'edge:_E:features': tensor([]....),
'node:_N:anonymous-default-0': tensor(),,,,
}
非常详细
_ntype_mapself._ntype_map = {ntype:i for i, ntype in enumerate(self.ntypes)}
def ntypes(self):
"""Return the list of node types of this graph.
Returns
-------
list of str
Examples
--------
>>> g = DistGraph("test")
>>> g.ntypes
['_U']
"""
return self._gpb.ntypes
结果
{'_N': 0}
_get_ndata_names
names = g._get_ndata_names()
print(names)
for name in names:
print(name)
dtype, shape, policy = g._client.get_data_meta(str(name))
print(dtype, shape, policy)
结果
[] node:_N:features torch.int64 (101, 1)
直接print不出来,但是可以通过循环调用iter方法得到里面的值
关于Partition,可以参考DGL KVStore的partition部分。
这个函数是根据一组bool型的tensor以及图的PartitionBook返还一组tensor,返还的tensor为选取的node id。
举例
selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
# Test node split
nodes = node_split(selected_nodes, g.get_partition_book())
print(selected_nodes)
print(nodes)
结果:
[False True True True False False False False False True False True
False True True False True True True True True False True True
False True True True True False True True True False False True
True False False True True False True True True False True False
True True True True True True True True True True True True
True False True True False False False True False False True True
True True False False False True True False True True False True
False True True True True False True True True False True True
True True True False False]
tensor([ 1, 2, 3, 9, 11, 13, 14, 16, 17, 18, 19, 20, 22, 23, 25, 26, 27, 28,
30, 31, 32, 35, 36, 39, 40, 42, 43, 44, 46, 48, 49, 50, 51, 52, 53, 54,
55, 56, 57, 58, 59, 60, 62, 63, 67, 70, 71, 72, 73, 77, 78, 80, 81, 83,
85, 86, 87, 88, 90, 91, 92, 94, 95, 96, 97, 98])
后面Multi-Client and trainer中还介绍了加入num_trainer_per_machine这个参数后的情况。
even or uneven在node_split函数中,有一个非常重要的参数force_even,默认为False。
按照官方的说法,
force_even = False,最大化局部性的优势,即把inner_node与node_mask中所有的都分出来force_even - True,将所有进程的node数都尽量分成一致,并且所有的进程的node数量之和为node_mask的长度,这种情况下有的node可能就不是inner_node了,但是会最大限度满足这个局部性
分别举例:
force_even=False
def example3():
import numpy as np
from dgl.distributed import partition_graph
from dgl.distributed import load_partition, node_split, edge_split
g= create_random_graph(101)
num_parts = 4
num_hops = 2
partition_graph(g, 'dist_graph_test', num_parts, 'dist_graph', num_hops=num_hops, part_method='metis')
node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
## np.nonzero return the x-index and y-index of nonzero position of a 2d vector
selected_nodes = np.nonzero(node_mask)[0]
selected_edges = np.nonzero(edge_mask)[0]
def set_roles(num_clients):
dgl.distributed.role.CUR_ROLE = 'default'
dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}
num_part = 1
for i in range(num_parts):
set_roles(num_parts)
part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition('dist_graph/dist_graph_test.json', i)
# print(part_g.ndata['inner_node'])
# print(part_g.ndata)
local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
# print(local_nids)
local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
# print(local_nids)
## np.intersect1d return a list of the same value of two 1d vector
nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
nodes2 = node_split(node_mask, gpb, rank=i, force_even=False)
assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
local_nids = F.asnumpy(local_nids)
for n in nodes1:
assert n in local_nids
force_even=True
def example4():
from dgl.distributed import partition_graph, load_partition, node_split, edge_split
import numpy as np
g = create_random_graph(101)
num_parts = 4
num_hops = 2
partition_graph(g, 'dist_graph_test', num_parts, 'dist_graph', num_hops, part_method='metis')
node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
selected_nodes = np.nonzero(node_mask)[0]
selected_edges = np.nonzero(edge_mask)[0]
all_nodes1 = []
all_nodes2 = []
all_edges1 = []
all_edges2 = []
def set_roles(num_clients):
dgl.distributed.role.CUR_ROLE = 'default'
dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}
for i in range(num_parts):
set_roles(num_parts)
part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition('dist_graph/dist_graph_test.json', i)
local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
nodes = node_split(node_mask, gpb, rank=i, force_even=True)
# nodes_uneven = node_split(node_mask, gpb, rank=i, force_even=False)
# print(nodes, nodes_uneven)
all_nodes1.append(nodes)
subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))
......
all_nodes1 = F.cat(all_nodes1, 0)
all_edges1 = F.cat(all_edges1, 0)
all_nodes2 = F.cat(all_nodes2, 0)
all_edges2 = F.cat(all_edges2, 0)
all_nodes = np.nonzero(node_mask)[0]
all_edges = np.nonzero(edge_mask)[0]
assert np.all(all_nodes == F.asnumpy(all_nodes1))
assert np.all(all_edges == F.asnumpy(all_edges1))
assert np.all(all_nodes == F.asnumpy(all_nodes2))
assert np.all(all_edges == F.asnumpy(all_edges2))
DisEmbedding
DistEmbedding在之前的分布式训练中都没遇到过,但是这里却单独拿出来,并且与优化器SparseAdagrad同时使用,不得不让人猜测这个OP很有可能就是和GNN的weights相关。参数如下
num_embedding: 与node或者edge的数量相等
优化器,参数为:
list: dgl.distributed.DistEmbeddinglr: learning_rate
这个优化器貌似自带正则,因为每次更新的梯度和之前不太一样,按照官网解释:
后面的一项可能就是原因了。
将两者合在一起,举个例子
def check_dist_emb(g, num_clients, num_nodes, num_edges):
from dgl.distributed.optim.pytorch import SparseAdagrad
from dgl.distributed.nn.pytorch import DistEmbedding
import numpy as np
import time
from numpy.testing import assert_almost_equal
try:
if g._client.client_id == 0:
time.sleep(0.3)
else:
time.sleep(0.5)
## refer to https://docs.dgl.ai/api/python/dgl.distributed.html?highlight=distembedding#dgl.distributed.DistEmbedding
emb = DistEmbedding(g.number_of_nodes(), 1, 'emb1', emb_init)
# print("emb")
nids = F.arange(0, int(g.number_of_nodes()))
lr = 0.1
optimizer = SparseAdagrad([emb], lr)
##SparseAdagrad will add regularization item automatically
#With multi-clients setting, racing will cause embedding update confusing, and will affect the next step.
with F.record_grad():
feats = emb(nids)
# print(np.shape(F.asnumpy(feats)))
assert np.all(F.asnumpy(feats)==np.zeros((len(nids), 1)))
loss = F.sum(feats + 1, 0)
loss.backward()
optimizer.step()
with F.record_grad():
feats = emb(nids)
print(feats)
loss = F.sum(feats + 1, 0)
loss.backward()
optimizer.step()
with F.record_grad():
feats = emb(nids)
print(feats)
loss = F.sum(feats + 1, 0)
loss.backward()
optimizer.step()
feats = emb(nids)
print(feats)
这里就是将emb更新了三次。但是一个有意思的点就是,如果在有多台Client的时候,会出现竞争的现象,导致某处embedding更新顺序混乱(这是因为多台Client共用一个DistEmbedding,一台update完其他Client再访问这个DistEmbedding都会改变),这个混乱会一直影响后面的更新。
另外,在进行Client端group的测试时,由于我的DGL Verion不支持DistGraphServer的keep_alive参数设置,所以在Server在运行完第一个group的Client代码的时候,会自动shutdown,这时我只能手动重启一次Server,让第二个group的CLient也连上Server并跑完。此时看到DisEmbedding重置了,也就是说DistEmbedding是根据Server来共享的,如果Server没有了,DistEmbedding也就随之销毁,再开启Server的时候DistEmbedding也会再次初始化。
在进行group调试的时候,我想再次复现这种效果,但是就是跑不出来了。于是想先后开两个Server,等group中第一个Client结束后再开另一个Server然group中另一个Client连山,但是这个时候出现了一个之前在跑Dist GraphSgae时常见的Error:
端口被占用!很明显两个进程的Server都想占有10000端口,所以第二个Server在启动时报错。
定义一个DistTensor只需要给出shape和type即可,若不给init_func,则全部初始化为0。
如果在定义一个DistTensor的时候没有给出其名字,则将被命名为‘node:_N’/‘edge:_E’+‘anonymous’+‘get_role()’(一般为defalut)+‘dist_tensor_id’(从0开始),若有名字(例如test_name),则名字为‘node:_N’/‘edge:_E’+,另外,在定义DistGraph的时候,其最初的feature tensor也会按照原名字被自动定义为DistTensor。
例如
new_shape = (g.number_of_nodes(), 2)
test1 = dgl.distributed.DistTensor(new_shape, F.int32)
print(test1._part_policy, test1._name)
g.ndata['test1'] = test1
feats = g.ndata['test1'][nids]
assert np.all(F.asnumpy(feats) == 0)
assert test1.count_nonzero() == 0
打印出来的结果就是
node:_N:anonymous-default-0
这里我一直好奇为什么我并没有定义它的partition_policy,为什么它会知道是NodePartitionPolicy呢?
后面的示例可以使用DistTensor直接将某个tensor加到DistGraph的feature上
code如下
def check_dist_graph_empty(g, num_clients, num_nodes, num_edges):
# Test API
import numpy as np
assert g.number_of_nodes() == num_nodes
assert g.number_of_edges() == num_edges
# Test init node data
new_shape = (g.number_of_nodes(), 2)
g.ndata['test1'] = dgl.distributed.DistTensor(new_shape, F.int32)
nids = F.arange(0, int(g.number_of_nodes() / 2))
feats = g.ndata['test1'][nids]
assert np.all(F.asnumpy(feats) == 0)
# create a tensor and destroy a tensor and create it again.
test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test3', init_func=rand_init)
del test3
test3 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test3')
del test3
# Test write data
new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
g.ndata['test1'][nids] = new_feats
feats = g.ndata['test1'][nids]
# print(feats)
assert np.all(F.asnumpy(feats) == 1)
# Test metadata operations.
assert g.node_attr_schemes()['test1'].dtype == F.int32
print(g.node_attr_schemes())
print('end')
add as DistGraph feature with specific policy
分布式的tensor object被切分和store在机器集群之间,切分原则就是根据parttion policy,可以参考之前KVStore的data分布。
举个
rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
# print(rest)
feats1 = emb(rest)
assert np.all(F.asnumpy(feats1)==np.zeros((len(rest), 1)))
policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
# print("partition_book is RangePartitionBook ",g.get_partition_book())
grad_sum = dgl.distributed.DistTensor((g.number_of_nodes(),1), F.float32, 'emb1_sum', policy)
if num_clients == 1:
assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
# print(grad_sum[nids])
assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))
reference to one that exits
# reference to a one that exists
test2 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2', init_func=rand_init)
test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2')
assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))
很简单通过相同名字来引用
多次定义这里还有一个多次定义的栗子
# add tests for anonymous distributed tensor.
test3 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
data = test3[0:10]
test4 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
print(test4._name, test4[nids])
del test3
test5 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
assert np.sum(F.asnumpy(test5[0:10] != data)) > 0
# test a persistent tesnor
test4 = dgl.distributed.DistTensor(new_shape, F.float32, 'test4', init_func=rand_init,
persistent=True)
print(test4._name,test4[nids])
del test4
try:
test4 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test4')
print(test4._name,test4)
raise Exception('')
except:
pass
出来的结果为
node:_N:anonymous-default-2 node:_N:test4
两次tensor的值不一样,说明两次定义都会重新初始化,是一个覆盖的过程。而且try里面的语句不会执行,因为shape不一致,如果强制执行会报错
AssertionError: The shape does not match with the existing tensor
说明del的不是很彻底,也有可能是取了名字的缘故。
about shared_memory在调试shared_mem这个参数时,发现如果把这个参数disable时,在启动DistGraphServer时,也就是在Server端,不会进行
_copy_graph_to_shared_menshared_memory
以上两个函数分别针对graph以及graph partition book
所以在Client中的DistGraph中以下两个函数的返回值为None
_get_graph_from_shared_memget_shared_mem_partition_book
这里需要讲一下在切割图的时候是可以设定参数num_trainer_per_machine来设定切完的partition的属性。虽然这个参数我还不确定会不会加速切割过程,但是通过它切完之后,可以把每个partition分到和num_trainer_per_machine相等的client上(在同一台机器上)。这也可能就是这个参数取名的由来吧。具体可以参考partition OP中关于这个参数的介绍.
在多台机器的环境下,应该是每台机器负责一个partition,然后每台机器的trainer负责这个partition的通过这个参数更细粒度的切分。
如何获取当前partition当前trainer_id的node_id呢,利用dgl.distributed.node_split()函数。
code如下
##Test Server and Client
def example2():
import os
from utils import reset_envs
reset_envs()
os.environ['DGL_DIST_MODE'] = 'distributed'
check_server_client_hierarchy(False, 1, 4)
def check_server_client_hierarchy(shared_mem, num_servers, num_cients):
from dgl.distributed import partition_graph
import multiprocessing as mp
import numpy as np
prepare_dist(num_server=num_servers)
g = create_random_graph(101)
num_parts=1
graph_name = 'dist_graph_test_2'
g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
partition_graph(g, graph_name, num_parts, 'dist_graph', num_trainers_per_machine=num_cients)
serv_ps = []
ctx = mp.get_context('spawn')
for serv_id in range(num_servers):
p = ctx.Process(target=run_server, args=(graph_name, serv_id,
num_servers, num_cients,
shared_mem))
serv_ps.append(p)
p.start()
manager = mp.Manager()
return_dict = manager.dict()
node_mask = np.zeros((g.number_of_nodes(),), np.int32)
edge_mask = np.zeros((g.number_of_edges(),), np.int32)
nodes = np.random.choice(g.number_of_nodes(), g.number_of_nodes() // 10, replace=False)
edges = np.random.choice(g.number_of_edges(), g.number_of_edges() // 10, replace=False)
node_mask[nodes] = 1
edge_mask[edges] = 1
nodes = np.sort(nodes)
edges = np.sort(edges)
print(nodes)
cli_ps = []
for cli_id in range(num_cients):
print('start client: ', cli_id)
p = ctx.Process(target=run_client_hierarchy, args=(graph_name, 0, num_servers, node_mask, edge_mask, return_dict))
p.start()
cli_ps.append(p)
for p in cli_ps:
p.join()
for p in serv_ps:
p.join()
node1 = []
edge1 = []
for n,e in return_dict.values():
node1.append(n)
edge1.append(e)
print(node1)
node1, _ = F.sort_1d(F.cat(node1, 0))
edge1, _ = F.sort_1d(F.cat(edge1, 0))
assert np.all(F.asnumpy(node1) == nodes)
assert np.all(F.asnumpy(edge1) == edges)
print("clients have terminated")
def run_client_hierarchy(graph_name, part_id, num_servers, node_mask, edge_mask, return_dict):
import os
from dgl.distributed import DistGraph, load_partition_book, node_split, edge_split
os.environ['DGL_NUM_SERVER'] = str(num_servers)
dgl.distributed.initialize("kv_ip_config.txt")
gpb, graph_name , _, _ = load_partition_book('dist_graph/{}.json'.format(graph_name),
part_id, None)
g = DistGraph(graph_name, gpb)
node_mask = F.tensor(node_mask)
edge_mask = F.tensor(edge_mask)
nodes = node_split(node_mask, g.get_partition_book(), node_trainer_ids=g.ndata['trainer_id'])
edges = edge_split(edge_mask, g.get_partition_book(), edge_trainer_ids=g.edata['trainer_id'])
# print(node_mask)
# node_trainer_ids=g.ndata['trainer_id']
# print(node_trainer_ids[0:len(node_trainer_ids)])
print(nodes)
rank = g.rank()
# print("rank is equal to g._client.client_id, which is role.get_global_rank() in fact.")
# print(rank)
# print(g._client.client_id)
return_dict[rank] = (nodes, edges)
通过node_split处理之后会返回一个tensor,tensor的值即为这个trainer应当负责的node_id。
about role in DGL同时,这个rank和client_id其实都是通过initialize函数中定义的init_role的role.get_global_rank()的值。
这个role其实就是一个进程,而一个进程以后会初始化为一个KVClient。
官方原话,就是简简单单理解成rank,即可~
multiprocessing.ManagerEach process is associated with a role so that we can determine what
function can be invoked in a process. For example, we do not allow some
functions in sampler processes.
The initialization includes registeration the role of the current process and
get the roles of all client processes. It also computes the rank of all client
processes in a deterministic way so that all clients will have the same rank for
the same client process.
可以看到,在每台机器需要多个进程来放置多个trainer的时候,如果需要共享memory,可以使用multiprocessing中的Manager来定义namespace,list,dict来共享数据。
下面还有一个小示例关于Manager的使用,code来源于此
#test multiprocessing manager
import multiprocessing
import time
def f(ns, ls, di):
ns.x += 1
ns.y[0] += 1
ns_z = ns.z
ns_z[0] += 1
ns.z = ns_z
ls[0] += 1
ls[1][0] += 1 # unmanaged, not assigned back
ls_2 = ls[2] # unmanaged...
ls_2[0] += 1
ls[2] = ls_2 # ... but assigned back
ls[3][0] += 1 # managed, direct manipulation
di[0] += 1
di[1][0] += 1 # unmanaged, not assigned back
di_2 = di[2] # unmanaged...
di_2[0] += 1
di[2] = di_2 # ... but assigned back
di[3][0] += 1 # managed, direct manipulation
if __name__ == '__main__':
manager = multiprocessing.Manager()
ns = manager.Namespace()
ns.x = 1
ns.y = [1]
ns.z = [1]
ls = manager.list([1, [1], [1], manager.list([1])])
di = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])})
print('before', ns, ls, ls[2], ls[3][0], di, di[2], di[3][0], sep='n')
p = multiprocessing.Process(target=f, args=(ns, ls, di))
p.start()
p.join()
print('after', ns, ls, ls[2], ls[3][0], di, di[2], di[3][0], sep='n')
这个示例的意思就是如果在一个dict,list,namesapce中的普通类型数据(如int),则可以在其他进程中更改。若不是普通类型而是python的list类型,则更改了也不会改变。解决方法有
把这个list改成了Manager.list的类型定义一个list然后直接把这个list赋值给原list
上面代码运行的结果为
before Namespace(x=1, y=[1], z=[1]) [1, [1], [1],Standalone mode] [1] 1 {0: 1, 1: [1], 2: [1], 3: } [1] 1 after Namespace(x=2, y=[1], z=[2]) [2, [1], [2], ] [2] 2 {0: 2, 1: [1], 2: [2], 3: } [2] 2
Standalone模式是一个测试模式,这个模式不开Server,直接开Client,并且直接来读partition并不需要connect_to_server。
在dgl.distributed.initialize中也可以看到,这种模式下不能开Sampler进程
is_standalone = os.environ.get(
'DGL_DIST_MODE', 'standalone') == 'standalone'
if num_workers > 0 and not is_standalone:
SAMPLER_POOL = CustomPool(num_workers, (ip_config, num_servers, max_queue_size,
net_type, 'sampler', num_worker_threads))
else:
SAMPLER_POOL = None
-----------
if not is_standalone:
assert num_servers is not None and num_servers > 0,
'The number of servers per machine must be specified with a positive number.'
# print("connect count add")
connect_to_server(ip_config, num_servers, max_queue_size, net_type)
# print("connect count add")
init_role('default')
init_kvstore(ip_config, num_servers, 'default')
在后面的init_kvstore中,其实在这个模式下初始化的KVCLient并不是正常的KVClient而是一个fakeCLient,在standalone_kvstore.py中定义。
def init_kvstore(ip_config, num_servers, role):
"""initialize KVStore"""
global KVCLIENT
if KVCLIENT is None:
if os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone':
KVCLIENT = SA_KVClient()
else:
KVCLIENT = KVClient(ip_config, num_servers, role)
很多属性这个fakeClient是没有定义的。
同时在定义DistGraph是,partitions个数只能为1。
if os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone':
assert part_config is not None,
'When running in the standalone model, the partition config file is required'
self._client = get_kvstore()
assert self._client is not None,
'Distributed module is not initialized. Please call dgl.distributed.initialize.'
同时在添加feat数据到数据库时,由于没有Server定义好数据库,只能自己一个个定义
for name in node_feats:
# The feature name has the following format: node_type + "/" + feature_name.
ntype, feat_name = name.split('/')
self._client.add_data(str(HeteroDataName(True, ntype, feat_name)),
node_feats[name],
NodePartitionPolicy(self._gpb, ntype=ntype))
for name in edge_feats:
# The feature name has the following format: edge_type + "/" + feature_name.
etype, feat_name = name.split('/')
self._client.add_data(str(HeteroDataName(False, etype, feat_name)),
edge_feats[name],
EdgePartitionPolicy(self._gpb, etype=etype))
如果有server,在DistGraphServer这些会在主Server中直接被初始化。code如下
if not self.is_backup_server():
for name in node_feats:
# The feature name has the following format: node_type + "/" + feature_name to avoid
# feature name collision for different node types.
# example:"_N/your_node_feature_name"
# print(name)
ntype, feat_name = name.split('/')
data_name = HeteroDataName(True, ntype, feat_name)
self.init_data(name=str(data_name), policy_str=data_name.policy_str,
data_tensor=node_feats[name])
for name in edge_feats:
# The feature name has the following format: edge_type + "/" + feature_name to avoid
# feature name collision for different edge types.
etype, feat_name = name.split('/')
data_name = HeteroDataName(False, etype, feat_name)
self.init_data(name=str(data_name), policy_str=data_name.policy_str,
data_tensor=edge_feats[name])
Heterogenrous Graph



