栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

DGL distgraph store OP

DGL distgraph store OP

DGL distgraph store OP

DistGraph Server and Client

Server(含对比)

RPC OPKVStore OP Client(含对比)

RPC OPMultiRPC OPKVStore OPMulti-KVStore OP DistGraphndata

node_attr_schemes_ndata_store_client._data_store_ntype_map_get_ndata_namesnode_split

even or uneven

force_even=Falseforce_even=True DisEmbedding

SparseAdagrad DistTensor

initialization

_name and _part_policyadd as DistGraph featureadd as DistGraph feature with specific policyreference to one that exits多次定义 about shared_memoryMulti-Client and trainers

partition and node_splitabout role in DGLmultiprocessing.Manager Standalone modeHeterogenrous Graph

DistGraph Server and Client Server(含对比)

这里面的Server的启动方法直接采用DGL自带的DistGraphServer来启动,看代码

def run_server(graph_name, serv_id, num_servers, num_clients, shared_mem):
    from dgl.distributed import DistGraphServer
    g = DistGraphServer(server_id=serv_id, ip_config="kv_ip_config.txt",
                        num_servers = num_servers, num_clients = num_clients,
                        part_config="dist_graph/{}.json".format(graph_name),
                        disable_shared_mem=not shared_mem,
                        graph_format=['csc', 'coo'])
    print('start server', serv_id)
    g.start()

DistGraphServer为KVServer的子类,在该类里定义了一些backup_server,shared_memory以及load_partition,init_data,差不多类似于之前的,来看这个start函数

    def start(self):
        """ Start graph store server.
        """
        # start server
        server_state = ServerState(kv_store=self, local_g=self.client_g, partition_book=self.gpb)
        print('start graph service on server {} for part {}'.format(self.server_id, self.part_id))
        start_server(server_id=self.server_id,
                     ip_config=self.ip_config,
                     num_servers=self.num_servers,
                     num_clients=self.num_clients, server_state=server_state)

熟悉的配方,都是先ServerState再start_server,通过这样来启动server。
再来看看之前在RPC OP和KVStore OP中的server定义方式。

RPC OP
def start_server(num_client, ip_config, server_id = 0):
    import time
    print("Sleep 2 seconds to test client re-connect")
    time.sleep(2)
    server_state = dgl.distributed.ServerState(None, local_g=None, partition_book=None)
    dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse)
    print("Start server {}".format(server_id))
    dgl.distributed.start_server(server_id = server_id,
                                ip_config = ip_config,
                                num_servers= 1 ,
                                num_clients= num_client,
                                server_state = server_state)

非常简单地使用start_server,ServerState中的参数什么都没定义,因为并没有涉及任何数据库交互的过程,只是单纯传递信息。

KVStore OP
def start_server(server_id, num_clients, num_servers):
    import time
    print("Sleep 5 seconds to test client re-connect")
    time.sleep(5)
    kvstore = dgl.distributed.KVServer(server_id = server_id,
                                        ip_config="kv_ip_config.txt",
                                        num_servers = num_servers,
                                        num_clients = num_clients)
    kvstore.add_part_policy(node_policy)
    kvstore.add_part_policy(edge_policy)
    if kvstore.is_backup_server():
        kvstore.init_data('data_0', 'node:_N')
        kvstore.init_data('data_0_1', 'node:_N')
        kvstore.init_data('data_0_2', 'node:_N')
        kvstore.init_data('data_0_3', 'node:_N')
    else: 
        kvstore.init_data('data_0', 'node:_N', data_0)
        kvstore.init_data('data_0_1', 'node:_N', data_0_1)
        kvstore.init_data('data_0_2', 'node:_N', data_0_2)
        kvstore.init_data('data_0_3', 'node:_N', data_0_3)
    server_state = dgl.distributed.ServerState(kv_store = kvstore, local_g=None, partition_book=None)
    dgl.distributed.start_server(server_id = server_id,
                                ip_config="kv_ip_config.txt",
                                num_servers = num_servers,
                                num_clients = num_clients,
                                server_state = server_state)

定义了数据库并初始化了一些数据库的索引内容,但是没有存图。

Client(含对比)

这里使用initialize和DIstGraph来启动一个client

def run_client(graph_name, part_id, num_servers, num_clients, num_nodes, num_edges):
    import os
    from dgl.distributed import load_partition_book,DistGraph
    os.environ['DGL_NUM_SERVER'] = str(num_servers)
    dgl.distributed.initialize('kv_ip_config.txt')
    gpb, graph_name,_ , _ = load_partition_book('dist_graph/{}.json'.format(graph_name), part_id, None)
    print(graph_name)
    g= DistGraph(graph_name, gpb)
    check_dist_emb(g, num_clients, num_nodes, num_edges)

其中initialize中比较重要的就是使用connect_to_server来连接到Server,此外DistGraph在_init()中会调用get_kvstore().此外还会调用_get_graph_from_shared_mem从server拿到一个graph,这个graph没有feature tensor但是有’inner_node’和’inner_edge’以及node/edge的ID。
另外,这个DistGraph非常重要,单独放在下一节讲。

RPC OP
def start_client(ip_config):
    from numpy.testing import assert_array_equal
    dgl.distributed.register_service(HELLO_SERVICE_ID, HelloRequest, HelloResponse)
    dgl.distributed.connect_to_server(ip_config = ip_config, num_servers=1)

这里面只是使用connect_to_server连接到了Server端,这样就可以通过rpc传递已经注册的Request并接受Response。

MultiRPC OP

  • 当时的bug还没解决 KVStore OP
    def start_client(num_clients, num_servers):
        import os
        import time
        os.environ['DGL_DIST_MODE'] = 'distributed'
        dgl.distributed.initialize("kv_ip_config.txt")
        kvclient = dgl.distributed.KVClient("kv_ip_config.txt",
                                            num_servers = num_servers)
        kvclient.map_shared_data(partition_book=gpb)
    

    这里使用了initialize来连接和KVClient来启动一个KV数据库client。值得注意的是,这里的map_shared_data中的gpb其实是全局变量,等于说这里的shared memory应该是同一台机器的gpb。

    Multi-KVStore OP
    def start_client_mul_role(i, config):
        import os
        import time
        os.environ['DGL_DIST_MODE'] = 'distributed'
        dgl.distributed.initialize(config)
        if i==0:
            time.sleep(5)
        kvclient = dgl.distributed.kvstore.get_kvstore()
        kvclient.barrier()
        print(kvclient.client_id, kvclient.role, kvclient.machine_id)
        print(dgl.distributed.get_num_client())
        print("i: %d role: %s" % (i, kvclient.role))
        assert dgl.distributed.role.get_num_trainers() == 2
        assert dgl.distributed.role.get_trainer_rank() < 2
        print('trainer rank: %d, global rank: %d' % (dgl.distributed.role.get_trainer_rank(),
                                                     dgl.distributed.role.get_global_rank()))
        dgl.distributed.exit_client()
    

    这里通过get_store()来获取initialize中定义初始化的KVClient客户端,每个Client都一个role方法,machine_id以及client_id,这些都在init_role中定义。用以区分Sampler Client和Trainer Client。

    DistGraph

    DistGraph其实就是分布式中的DGL Graph,且运行在Client上。关于它和partition的关系,DGL 这样说道:

    DistGraph provides a global view of the distributed graph. Internally,
    it may contains a partition of the graph if it is co-located with
    the server. When servers and clients run on separate sets of machines,
    this returns None.

    这是DistGraph的local_partition属性,其返回值也就是self._g,而这个图就是从server端通过_get_graph_from_shared_mem拿到的图。这也再次证明了这里的shared memory应该就是针对同一个机器上的。
    此外NodeDataView和EdgeDataView可以帮助我们看到Graph的ndata和edata。

  • 虽然前面提到KVClient.map_shared_data(gpb),但是这个函数究竟有什么用,是所有Client都是共享的还是?这个gpb在分布式训练时什么时候读进去?都需要再研究研究。
  • 参阅下面about shared_memory一节 ndata

    取feature(通过NodeDataView类),例如

    g.ndata['feature_name']
    
    node_attr_schemes

    取node属性scheme(方案),只有名字没有具体feature值
    :

        # Test write data
        new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
        g.ndata['test1'][nids] = new_feats
        feats = g.ndata['test1'][nids]
        assert np.all(F.asnumpy(feats) == 1)
    
        # Test metadata operations.
        assert len(g.ndata['features']) == g.number_of_nodes()
        assert g.ndata['features'].shape == (g.number_of_nodes(), 1)
        assert g.ndata['features'].dtype == F.int64
        assert g.node_attr_schemes()['features'].dtype == F.int64
        assert g.node_attr_schemes()['test1'].dtype == F.int32
        assert g.node_attr_schemes()['features'].shape == (1,)
    
    _ndata_store
        g = DistGraph(graph_name, gpb=gpb)
        print(g._ndata_store)
        for ntype in g._ndata_store:
            print(ntype)
            print(g._ndata_store[ntype])
    

    结果,会将所有node的feature全部取出来(但是没有命名的不会!)

    {'features': }
    features
    
    

    DistTensor直接print不出来,但是可以通过一个1D tensor来取一个切片来print出来。

    _client._data_store

    通过DistGraph所属的KVClient来访问KVStore,能直接返回数据库中的所有data name+feature(包括node,edge,未命名的也会有!,并且可以直接print)
    结果

    {'node:_N:features':tensor([]....),
    'edge:_E:features': tensor([]....),
    'node:_N:anonymous-default-0': tensor(),,,,
    }
    

    非常详细

    _ntype_map
    self._ntype_map = {ntype:i for i, ntype in enumerate(self.ntypes)}
        def ntypes(self):
            """Return the list of node types of this graph.
    
            Returns
            -------
            list of str
    
            Examples
            --------
    
            >>> g = DistGraph("test")
            >>> g.ntypes
            ['_U']
            """
            return self._gpb.ntypes
    

    结果

    {'_N': 0}
    
    _get_ndata_names
        names = g._get_ndata_names()
        print(names)
        for name in names:
            print(name)
            dtype, shape, policy = g._client.get_data_meta(str(name))
            print(dtype, shape, policy)
    

    结果

    []
    node:_N:features
    torch.int64 (101, 1) 
    

    直接print不出来,但是可以通过循环调用iter方法得到里面的值
    关于Partition,可以参考DGL KVStore的partition部分。

  • 后续有空可以把Basic/RangePartitionBook、PartitionPolicy、NodePartition、EdgePartition的源码再看看 node_split

    这个函数是根据一组bool型的tensor以及图的PartitionBook返还一组tensor,返还的tensor为选取的node id。
    举例

        selected_nodes = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
        # Test node split
        nodes = node_split(selected_nodes, g.get_partition_book())
        print(selected_nodes)
        print(nodes)
    

    结果:

    [False  True  True  True False False False False False  True False  True
     False  True  True False  True  True  True  True  True False  True  True
     False  True  True  True  True False  True  True  True False False  True
      True False False  True  True False  True  True  True False  True False
      True  True  True  True  True  True  True  True  True  True  True  True
      True False  True  True False False False  True False False  True  True
      True  True False False False  True  True False  True  True False  True
     False  True  True  True  True False  True  True  True False  True  True
      True  True  True False False]
    tensor([ 1,  2,  3,  9, 11, 13, 14, 16, 17, 18, 19, 20, 22, 23, 25, 26, 27, 28,
            30, 31, 32, 35, 36, 39, 40, 42, 43, 44, 46, 48, 49, 50, 51, 52, 53, 54,
            55, 56, 57, 58, 59, 60, 62, 63, 67, 70, 71, 72, 73, 77, 78, 80, 81, 83,
            85, 86, 87, 88, 90, 91, 92, 94, 95, 96, 97, 98])
    

    后面Multi-Client and trainer中还介绍了加入num_trainer_per_machine这个参数后的情况。

    even or uneven

    在node_split函数中,有一个非常重要的参数force_even,默认为False。
    按照官方的说法,

    force_even = False,最大化局部性的优势,即把inner_node与node_mask中所有的都分出来force_even - True,将所有进程的node数都尽量分成一致,并且所有的进程的node数量之和为node_mask的长度,这种情况下有的node可能就不是inner_node了,但是会最大限度满足这个局部性
    分别举例: force_even=False

    def example3():
        import numpy as np
        from dgl.distributed import partition_graph
        from dgl.distributed import load_partition, node_split, edge_split
        g= create_random_graph(101)
        num_parts = 4
        num_hops = 2
        partition_graph(g, 'dist_graph_test', num_parts, 'dist_graph', num_hops=num_hops, part_method='metis')
    
        node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
        edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
        ## np.nonzero return the x-index and y-index of nonzero position of a 2d vector 
        selected_nodes = np.nonzero(node_mask)[0]
        selected_edges = np.nonzero(edge_mask)[0]
    
        def set_roles(num_clients):
            dgl.distributed.role.CUR_ROLE = 'default'
            dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
            dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}
    
        num_part = 1
        for i in range(num_parts):
            set_roles(num_parts)
            part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition('dist_graph/dist_graph_test.json', i)
            # print(part_g.ndata['inner_node'])
            # print(part_g.ndata)
            local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
            # print(local_nids)
            local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
            # print(local_nids)
            ## np.intersect1d return a list of the same value of two 1d vector
            nodes1 = np.intersect1d(selected_nodes, F.asnumpy(local_nids))
            nodes2 = node_split(node_mask, gpb, rank=i, force_even=False)
            assert np.all(np.sort(nodes1) == np.sort(F.asnumpy(nodes2)))
            local_nids = F.asnumpy(local_nids)
            for n in nodes1:
                assert n in local_nids   
    
    force_even=True
    def example4():
        from dgl.distributed import partition_graph, load_partition, node_split, edge_split
        import numpy as np
        g = create_random_graph(101)
        num_parts = 4
        num_hops = 2
        partition_graph(g, 'dist_graph_test', num_parts, 'dist_graph', num_hops, part_method='metis')
    
        node_mask = np.random.randint(0, 100, size=g.number_of_nodes()) > 30
        edge_mask = np.random.randint(0, 100, size=g.number_of_edges()) > 30
        selected_nodes = np.nonzero(node_mask)[0]
        selected_edges = np.nonzero(edge_mask)[0]
        all_nodes1 = []
        all_nodes2 = []
        all_edges1 = []
        all_edges2 = []
    
        def set_roles(num_clients):
            dgl.distributed.role.CUR_ROLE = 'default'
            dgl.distributed.role.GLOBAL_RANK = {i:i for i in range(num_clients)}
            dgl.distributed.role.PER_ROLE_RANK['default'] = {i:i for i in range(num_clients)}
    
        for i in range(num_parts):
            set_roles(num_parts)
            part_g, node_feats, edge_feats, gpb, _, _, _ = load_partition('dist_graph/dist_graph_test.json', i)
            local_nids = F.nonzero_1d(part_g.ndata['inner_node'])
            local_nids = F.gather_row(part_g.ndata[dgl.NID], local_nids)
            nodes = node_split(node_mask, gpb, rank=i, force_even=True)
            # nodes_uneven = node_split(node_mask, gpb, rank=i, force_even=False)
            # print(nodes, nodes_uneven)
            all_nodes1.append(nodes)
            subset = np.intersect1d(F.asnumpy(nodes), F.asnumpy(local_nids))
            print('part {} get {} nodes and {} are in the partition'.format(i, len(nodes), len(subset)))
            ......
        all_nodes1 = F.cat(all_nodes1, 0)
        all_edges1 = F.cat(all_edges1, 0)
        all_nodes2 = F.cat(all_nodes2, 0)
        all_edges2 = F.cat(all_edges2, 0)
        all_nodes = np.nonzero(node_mask)[0]
        all_edges = np.nonzero(edge_mask)[0]
        assert np.all(all_nodes == F.asnumpy(all_nodes1))
        assert np.all(all_edges == F.asnumpy(all_edges1))
        assert np.all(all_nodes == F.asnumpy(all_nodes2))
        assert np.all(all_edges == F.asnumpy(all_edges2))
    
    DisEmbedding

    DistEmbedding在之前的分布式训练中都没遇到过,但是这里却单独拿出来,并且与优化器SparseAdagrad同时使用,不得不让人猜测这个OP很有可能就是和GNN的weights相关。参数如下

    num_embedding: 与node或者edge的数量相等

  • 这是我最不能理解的点,如果是weight,那应该只会和维度有关,但是这里却跟node的数量相等,给人很像是feature tensor的感觉。此外,在DisEmbedding的定义中,实际上就是定义了一个DistTensor,并将(num_embedding,embedding_dim)作为一个shape参数传给DistTensor,并且DistEmbedding的很多属性都是直接从DistTensor里拿。 embedding_dim: 维度name: 名字init_func: 初始化函数,若不提供则为zerospart_policy: partition policy,暂时不管 SparseAdagrad

    优化器,参数为:

    list: dgl.distributed.DistEmbeddinglr: learning_rate
    这个优化器貌似自带正则,因为每次更新的梯度和之前不太一样,按照官网解释:

    后面的一项可能就是原因了。
    将两者合在一起,举个例子

    def check_dist_emb(g, num_clients, num_nodes, num_edges):
        from dgl.distributed.optim.pytorch import SparseAdagrad
        from dgl.distributed.nn.pytorch import DistEmbedding
        import numpy as np
        import time
        from numpy.testing import assert_almost_equal
        try:
            if g._client.client_id == 0:
                time.sleep(0.3)
            else:
                time.sleep(0.5)
            ## refer to https://docs.dgl.ai/api/python/dgl.distributed.html?highlight=distembedding#dgl.distributed.DistEmbedding
            emb = DistEmbedding(g.number_of_nodes(), 1, 'emb1', emb_init)
            # print("emb")
            nids = F.arange(0, int(g.number_of_nodes()))
            lr = 0.1
            optimizer = SparseAdagrad([emb], lr)
            ##SparseAdagrad will add regularization item automatically
            #With multi-clients setting, racing will cause embedding update confusing, and will affect the next step. 
            with F.record_grad():
                feats = emb(nids)
                # print(np.shape(F.asnumpy(feats)))
                assert np.all(F.asnumpy(feats)==np.zeros((len(nids), 1)))
                loss = F.sum(feats + 1, 0)
            loss.backward()
            optimizer.step()
            with F.record_grad():
                feats = emb(nids)
                print(feats)
                loss = F.sum(feats + 1, 0)
            loss.backward()
            optimizer.step()
            with F.record_grad():
                feats = emb(nids)
                print(feats)
                loss = F.sum(feats + 1, 0)
            loss.backward()
            optimizer.step()
            feats = emb(nids)
            print(feats)
    

    这里就是将emb更新了三次。但是一个有意思的点就是,如果在有多台Client的时候,会出现竞争的现象,导致某处embedding更新顺序混乱(这是因为多台Client共用一个DistEmbedding,一台update完其他Client再访问这个DistEmbedding都会改变),这个混乱会一直影响后面的更新。
    另外,在进行Client端group的测试时,由于我的DGL Verion不支持DistGraphServer的keep_alive参数设置,所以在Server在运行完第一个group的Client代码的时候,会自动shutdown,这时我只能手动重启一次Server,让第二个group的CLient也连上Server并跑完。此时看到DisEmbedding重置了,也就是说DistEmbedding是根据Server来共享的,如果Server没有了,DistEmbedding也就随之销毁,再开启Server的时候DistEmbedding也会再次初始化。
    在进行group调试的时候,我想再次复现这种效果,但是就是跑不出来了。于是想先后开两个Server,等group中第一个Client结束后再开另一个Server然group中另一个Client连山,但是这个时候出现了一个之前在跑Dist GraphSgae时常见的Error:
    端口被占用!很明显两个进程的Server都想占有10000端口,所以第二个Server在启动时报错。

  • 这个bug解决方式可能需要一个一个join每个Server/Client进程,太麻烦了,以后再试。 DistTensor initialization _name and _part_policy

    定义一个DistTensor只需要给出shape和type即可,若不给init_func,则全部初始化为0。
    如果在定义一个DistTensor的时候没有给出其名字,则将被命名为‘node:_N’/‘edge:_E’+‘anonymous’+‘get_role()’(一般为defalut)+‘dist_tensor_id’(从0开始),若有名字(例如test_name),则名字为‘node:_N’/‘edge:_E’+,另外,在定义DistGraph的时候,其最初的feature tensor也会按照原名字被自动定义为DistTensor。
    例如

        new_shape = (g.number_of_nodes(), 2)
        test1 = dgl.distributed.DistTensor(new_shape, F.int32)
        print(test1._part_policy, test1._name)
        g.ndata['test1'] = test1
        feats = g.ndata['test1'][nids]
        assert np.all(F.asnumpy(feats) == 0)
        assert test1.count_nonzero() == 0
    

    打印出来的结果就是

     node:_N:anonymous-default-0
    

    这里我一直好奇为什么我并没有定义它的partition_policy,为什么它会知道是NodePartitionPolicy呢?

  • 这是因为在DistTensor定义的时候就会找到所有可能的policy,但是如果有两种情况的policy的话,就会报错 'Multiple partition policies match the input shape. ' 'Please provide a partition policy explicitly.'我一看,这不就是我之前生成一个100个node的图的时候报的错误吗(随机边为0.1),也就是说当时我的随机图有100个node,100个edge,这也就会有两种匹配方案,即NodePartitonPolicy和EdgePartitionPolicy。所以也就是说,DGL这里说的找到可能的policy就是通过DistTensor的维度和num_node或者num_edge来匹配的。 add as DistGraph feature

    后面的示例可以使用DistTensor直接将某个tensor加到DistGraph的feature上
    code如下

    def check_dist_graph_empty(g, num_clients, num_nodes, num_edges):
        # Test API
        import numpy as np
        assert g.number_of_nodes() == num_nodes
        assert g.number_of_edges() == num_edges
    
        # Test init node data
        new_shape = (g.number_of_nodes(), 2)
        g.ndata['test1'] = dgl.distributed.DistTensor(new_shape, F.int32)
        nids = F.arange(0, int(g.number_of_nodes() / 2))
        feats = g.ndata['test1'][nids]
        assert np.all(F.asnumpy(feats) == 0)
    
        # create a tensor and destroy a tensor and create it again.
        test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test3', init_func=rand_init)
        del test3
        test3 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test3')
        del test3
    
        # Test write data
        new_feats = F.ones((len(nids), 2), F.int32, F.cpu())
        g.ndata['test1'][nids] = new_feats
        feats = g.ndata['test1'][nids]
        # print(feats)
        assert np.all(F.asnumpy(feats) == 1)
    
        # Test metadata operations.
        assert g.node_attr_schemes()['test1'].dtype == F.int32
        print(g.node_attr_schemes())
    
        print('end')
    
    add as DistGraph feature with specific policy

    分布式的tensor object被切分和store在机器集群之间,切分原则就是根据parttion policy,可以参考之前KVStore的data分布。
    举个

            rest = np.setdiff1d(np.arange(g.number_of_nodes()), F.asnumpy(nids))
            # print(rest)
            feats1 = emb(rest)
            assert np.all(F.asnumpy(feats1)==np.zeros((len(rest), 1)))
    
            policy = dgl.distributed.PartitionPolicy('node', g.get_partition_book())
            # print("partition_book is RangePartitionBook ",g.get_partition_book())
            grad_sum = dgl.distributed.DistTensor((g.number_of_nodes(),1), F.float32, 'emb1_sum', policy)
            if num_clients == 1:
                assert np.all(F.asnumpy(grad_sum[nids]) == np.ones((len(nids), 1)) * num_clients)
                # print(grad_sum[nids])
            assert np.all(F.asnumpy(grad_sum[rest]) == np.zeros((len(rest), 1)))  
    
    reference to one that exits
        # reference to a one that exists
        test2 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2', init_func=rand_init)
        test3 = dgl.distributed.DistTensor(new_shape, F.float32, 'test2')
        assert np.all(F.asnumpy(test2[nids]) == F.asnumpy(test3[nids]))
    

    很简单通过相同名字来引用

    多次定义

    这里还有一个多次定义的栗子

        # add tests for anonymous distributed tensor.
        test3 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
        data = test3[0:10]
        test4 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
        print(test4._name, test4[nids])
        del test3
        test5 = dgl.distributed.DistTensor(new_shape, F.float32, init_func=rand_init)
        assert np.sum(F.asnumpy(test5[0:10] != data)) > 0
    
        # test a persistent tesnor
        test4 = dgl.distributed.DistTensor(new_shape, F.float32, 'test4', init_func=rand_init,
                                           persistent=True)
        print(test4._name,test4[nids])
        del test4
        try:
            test4 = dgl.distributed.DistTensor((g.number_of_nodes(), 3), F.float32, 'test4')
            print(test4._name,test4)
            raise Exception('')
        except:
            pass
    

    出来的结果为

    node:_N:anonymous-default-2 
    node:_N:test4 
    

    两次tensor的值不一样,说明两次定义都会重新初始化,是一个覆盖的过程。而且try里面的语句不会执行,因为shape不一致,如果强制执行会报错

    AssertionError: The shape does not match with the existing tensor
    

    说明del的不是很彻底,也有可能是取了名字的缘故。

    about shared_memory

    在调试shared_mem这个参数时,发现如果把这个参数disable时,在启动DistGraphServer时,也就是在Server端,不会进行

    _copy_graph_to_shared_menshared_memory

    以上两个函数分别针对graph以及graph partition book
    所以在Client中的DistGraph中以下两个函数的返回值为None

    _get_graph_from_shared_memget_shared_mem_partition_book

  • 最近发现在定义partition的graph_partition_book.py文件中有很多shared_memory相关的函数,每种类型的PartitionPolicy都有,有时间可以看一下。而且还和pickle有关联。 Multi-Client and trainers partition and node_split

    这里需要讲一下在切割图的时候是可以设定参数num_trainer_per_machine来设定切完的partition的属性。虽然这个参数我还不确定会不会加速切割过程,但是通过它切完之后,可以把每个partition分到和num_trainer_per_machine相等的client上(在同一台机器上)。这也可能就是这个参数取名的由来吧。具体可以参考partition OP中关于这个参数的介绍.
    在多台机器的环境下,应该是每台机器负责一个partition,然后每台机器的trainer负责这个partition的通过这个参数更细粒度的切分。
    如何获取当前partition当前trainer_id的node_id呢,利用dgl.distributed.node_split()函数。
    code如下

    ##Test Server and Client
    def example2():
        import os
        from utils import reset_envs
        reset_envs()
        os.environ['DGL_DIST_MODE'] = 'distributed'
        check_server_client_hierarchy(False, 1, 4)
    
    def check_server_client_hierarchy(shared_mem, num_servers, num_cients):
        from dgl.distributed import partition_graph
        import multiprocessing as mp
        import numpy as np
        prepare_dist(num_server=num_servers)
        g = create_random_graph(101)
    
        num_parts=1
        graph_name = 'dist_graph_test_2'
        g.ndata['features'] = F.unsqueeze(F.arange(0, g.number_of_nodes()), 1)
        g.edata['features'] = F.unsqueeze(F.arange(0, g.number_of_edges()), 1)
        partition_graph(g, graph_name, num_parts, 'dist_graph', num_trainers_per_machine=num_cients)
    
        serv_ps = []
        ctx = mp.get_context('spawn')
        for serv_id in range(num_servers):
            p = ctx.Process(target=run_server, args=(graph_name, serv_id, 
                                                    num_servers, num_cients,
                                                    shared_mem))
            serv_ps.append(p)
            p.start()
        
        manager = mp.Manager()
        return_dict = manager.dict()
        node_mask = np.zeros((g.number_of_nodes(),), np.int32)
        edge_mask = np.zeros((g.number_of_edges(),), np.int32)
        nodes = np.random.choice(g.number_of_nodes(), g.number_of_nodes() // 10, replace=False)
        edges = np.random.choice(g.number_of_edges(), g.number_of_edges() // 10, replace=False)
        node_mask[nodes] = 1
        edge_mask[edges] = 1
        nodes = np.sort(nodes)
        edges = np.sort(edges)
        print(nodes)
        cli_ps = []
        for cli_id in range(num_cients):
            print('start client: ', cli_id)
            p = ctx.Process(target=run_client_hierarchy, args=(graph_name, 0, num_servers, node_mask, edge_mask, return_dict))
            p.start()
            cli_ps.append(p)
        
        for p in cli_ps:
            p.join()
        for p in serv_ps:
            p.join()
        
        node1 = []
        edge1 = []
        for n,e in return_dict.values():
            node1.append(n)
            edge1.append(e)
        print(node1)
        node1, _ = F.sort_1d(F.cat(node1, 0))
        edge1, _ = F.sort_1d(F.cat(edge1, 0))
        assert np.all(F.asnumpy(node1) == nodes)
        assert np.all(F.asnumpy(edge1) == edges)
    
        print("clients have terminated")
    
        
    
    def run_client_hierarchy(graph_name, part_id, num_servers, node_mask, edge_mask, return_dict):
        import os
        from dgl.distributed import DistGraph, load_partition_book, node_split, edge_split
        os.environ['DGL_NUM_SERVER'] = str(num_servers)
        dgl.distributed.initialize("kv_ip_config.txt")
        gpb, graph_name , _, _ = load_partition_book('dist_graph/{}.json'.format(graph_name),
                                                    part_id, None)
        g = DistGraph(graph_name, gpb)
        node_mask = F.tensor(node_mask)
        edge_mask = F.tensor(edge_mask)
        nodes = node_split(node_mask, g.get_partition_book(), node_trainer_ids=g.ndata['trainer_id'])
        edges = edge_split(edge_mask, g.get_partition_book(), edge_trainer_ids=g.edata['trainer_id'])
        # print(node_mask)
        # node_trainer_ids=g.ndata['trainer_id']
        # print(node_trainer_ids[0:len(node_trainer_ids)])
        print(nodes)
        rank = g.rank()
        # print("rank is equal to g._client.client_id, which is role.get_global_rank() in fact.")
        # print(rank)
        # print(g._client.client_id)
        return_dict[rank] = (nodes, edges)
    

    通过node_split处理之后会返回一个tensor,tensor的值即为这个trainer应当负责的node_id。

    about role in DGL

    同时,这个rank和client_id其实都是通过initialize函数中定义的init_role的role.get_global_rank()的值。
    这个role其实就是一个进程,而一个进程以后会初始化为一个KVClient。
    官方原话,就是简简单单理解成rank,即可~

    Each process is associated with a role so that we can determine what
    function can be invoked in a process. For example, we do not allow some
    functions in sampler processes.
    The initialization includes registeration the role of the current process and
    get the roles of all client processes. It also computes the rank of all client
    processes in a deterministic way so that all clients will have the same rank for
    the same client process.

    multiprocessing.Manager

    可以看到,在每台机器需要多个进程来放置多个trainer的时候,如果需要共享memory,可以使用multiprocessing中的Manager来定义namespace,list,dict来共享数据。
    下面还有一个小示例关于Manager的使用,code来源于此

    #test multiprocessing manager
    import multiprocessing
    import time
    
    def f(ns, ls, di):
        ns.x += 1
        ns.y[0] += 1
        ns_z = ns.z
        ns_z[0] += 1
        ns.z = ns_z
    
        ls[0] += 1
        ls[1][0] += 1 # unmanaged, not assigned back
        ls_2 = ls[2]  # unmanaged...
        ls_2[0] += 1
        ls[2] = ls_2  # ... but assigned back
        ls[3][0] += 1 # managed, direct manipulation
    
        di[0] += 1
        di[1][0] += 1 # unmanaged, not assigned back
        di_2 = di[2]  # unmanaged...
        di_2[0] += 1
        di[2] = di_2  # ... but assigned back
        di[3][0] += 1 # managed, direct manipulation
    
    if __name__ == '__main__':
        manager = multiprocessing.Manager()
        ns = manager.Namespace()
        ns.x = 1
        ns.y = [1]
        ns.z = [1]
        ls = manager.list([1, [1], [1], manager.list([1])])
        di = manager.dict({0: 1, 1: [1], 2: [1], 3: manager.list([1])})
    
        print('before', ns, ls, ls[2], ls[3][0], di, di[2], di[3][0], sep='n')
        p = multiprocessing.Process(target=f, args=(ns, ls, di))
        p.start()
        p.join()
        print('after', ns, ls, ls[2], ls[3][0], di, di[2], di[3][0], sep='n')
    

    这个示例的意思就是如果在一个dict,list,namesapce中的普通类型数据(如int),则可以在其他进程中更改。若不是普通类型而是python的list类型,则更改了也不会改变。解决方法有

    把这个list改成了Manager.list的类型定义一个list然后直接把这个list赋值给原list
    上面代码运行的结果为

    before
    Namespace(x=1, y=[1], z=[1])
    [1, [1], [1], ]
    [1]
    1
    {0: 1, 1: [1], 2: [1], 3: }
    [1]
    1
    after
    Namespace(x=2, y=[1], z=[2])
    [2, [1], [2], ]
    [2]
    2
    {0: 2, 1: [1], 2: [2], 3: }
    [2]
    2
    
    Standalone mode

    Standalone模式是一个测试模式,这个模式不开Server,直接开Client,并且直接来读partition并不需要connect_to_server。
    在dgl.distributed.initialize中也可以看到,这种模式下不能开Sampler进程

            is_standalone = os.environ.get(
                'DGL_DIST_MODE', 'standalone') == 'standalone'
            if num_workers > 0 and not is_standalone:
                SAMPLER_POOL = CustomPool(num_workers, (ip_config, num_servers, max_queue_size,
                                                        net_type, 'sampler', num_worker_threads))
            else:
                SAMPLER_POOL = None
                -----------
            if not is_standalone:
                assert num_servers is not None and num_servers > 0, 
                    'The number of servers per machine must be specified with a positive number.'
                # print("connect count add")
                connect_to_server(ip_config, num_servers, max_queue_size, net_type)
                # print("connect count add")
            init_role('default')
            init_kvstore(ip_config, num_servers, 'default')
    

    在后面的init_kvstore中,其实在这个模式下初始化的KVCLient并不是正常的KVClient而是一个fakeCLient,在standalone_kvstore.py中定义。

    def init_kvstore(ip_config, num_servers, role):
        """initialize KVStore"""
        global KVCLIENT
        if KVCLIENT is None:
            if os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone':
                KVCLIENT = SA_KVClient()
            else:
                KVCLIENT = KVClient(ip_config, num_servers, role)
    

    很多属性这个fakeClient是没有定义的。
    同时在定义DistGraph是,partitions个数只能为1。

            if os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone':
                assert part_config is not None, 
                        'When running in the standalone model, the partition config file is required'
                self._client = get_kvstore()
                assert self._client is not None, 
                        'Distributed module is not initialized. Please call dgl.distributed.initialize.'
    

    同时在添加feat数据到数据库时,由于没有Server定义好数据库,只能自己一个个定义

                for name in node_feats:
                    # The feature name has the following format: node_type + "/" + feature_name.
                    ntype, feat_name = name.split('/')
                    self._client.add_data(str(HeteroDataName(True, ntype, feat_name)),
                                          node_feats[name],
                                          NodePartitionPolicy(self._gpb, ntype=ntype))
                for name in edge_feats:
                    # The feature name has the following format: edge_type + "/" + feature_name.
                    etype, feat_name = name.split('/')
                    self._client.add_data(str(HeteroDataName(False, etype, feat_name)),
                                          edge_feats[name],
                                          EdgePartitionPolicy(self._gpb, etype=etype))
    

    如果有server,在DistGraphServer这些会在主Server中直接被初始化。code如下

            if not self.is_backup_server():
                for name in node_feats:
                    # The feature name has the following format: node_type + "/" + feature_name to avoid
                    # feature name collision for different node types.
                    # example:"_N/your_node_feature_name"
                    # print(name)
                    ntype, feat_name = name.split('/')
                    data_name = HeteroDataName(True, ntype, feat_name)
                    self.init_data(name=str(data_name), policy_str=data_name.policy_str,
                                   data_tensor=node_feats[name])
                for name in edge_feats:
                    # The feature name has the following format: edge_type + "/" + feature_name to avoid
                    # feature name collision for different edge types.
                    etype, feat_name = name.split('/')
                    data_name = HeteroDataName(False, etype, feat_name)
                    self.init_data(name=str(data_name), policy_str=data_name.policy_str,
                                   data_tensor=edge_feats[name])
    
    Heterogenrous Graph

  • TODO
  • 转载请注明:文章转载自 www.mshxw.com
    本文地址:https://www.mshxw.com/it/735732.html
    我们一直用心在做
    关于我们 文章归档 网站地图 联系我们

    版权所有 (c)2021-2022 MSHXW.COM

    ICP备案号:晋ICP备2021003244-6号