0
点赞
收藏
分享

微信扫一扫

DGL kv_store OP

DT_M 2022-01-26 阅读 17
python

DGL kv_store OP

PartitionBook

之前一直没弄懂这个partitionbook是什么,其实就是一个怎么把图分成几份的类,类似于哪些点属于哪个图的对应关系
其实dgl的doc上已经讲的比较清楚饿了

BasicGraphPartitionBook记录每个nodeID和edgeID对应的partitionID,比较耗资源,而RangeGraphPartitionBook则是计算映射关系,这里提到在同一个partition中ID都是连续的。这里测试的是Basic。
上code

gpb = dgl.distributed.graph_partition_book.BasicPartitionBook(part_id=0,
                                                              num_parts=1,
                                                              node_map=node_map,
                                                              edge_map=edge_map,
                                                              part_graph=g)
node_policy = dgl.distributed.PartitionPolicy(policy_str='node:_N',
                                              partition_book=gpb)
edge_policy = dgl.distributed.PartitionPolicy(policy_str='edge:_E',
                                              partition_book=gpb)

BasicPartitionBook

需要对几个参数进行解释

  • part_id: 当前图的part_ID,当前图就是part_graph
  • num_parts: 大图一共切了几份
  • node_map: 大图ID与partID对应的部分
  • edge_map: 同上

所以node_map和edge_map每张partition都一样

PartitionPolicy

对node和edge进行细分的partition

  • policy_str: 一般就是node:_N和edge:_E,之后会把存在图里的_N和_E切到小图中
  • partition_book:直接写BasicPartitionBook类

一份切和两份切的示例

one partition

node_map = F.tensor([0,0,0,0,0,0], F.int64)
edge_map = F.tensor([0,0,0,0,0,0,0], F.int64)
global_nid = F.tensor([0,1,2,3,4,5], F.int64)
global_eid = F.tensor([0,1,2,3,4,5,6], F.int64)

g = dgl.DGLGraph()
g.add_nodes(6)
g.add_edges(0, 1) # 0
g.add_edges(0, 2) # 1
g.add_edges(0, 3) # 2
g.add_edges(2, 3) # 3
g.add_edges(1, 1) # 4
g.add_edges(0, 4) # 5
g.add_edges(2, 5) # 6
g.ndata[dgl.NID] = global_nid
g.edata[dgl.EID] = global_eid
# ShowGraph_without_label(g, None, 5, 5, 'kv_store_testgraph.jpg')

gpb = dgl.distributed.graph_partition_book.BasicPartitionBook(part_id=0,
                                                              num_parts=1,
                                                              node_map=node_map,
                                                              edge_map=edge_map,
                                                              part_graph=g)
node_policy = dgl.distributed.PartitionPolicy(policy_str='node:_N',
                                              partition_book=gpb)
edge_policy = dgl.distributed.PartitionPolicy(policy_str='edge:_E',
                                              partition_book=gpb)

two partition

def example1_v2():
    node_map = F.tensor([0,0,0,0,1,1], F.int64)
    edge_map = F.tensor([0,0,0,0,1,1,1], F.int64)
    global_nid_1 = F.tensor([0,1,2,3])
    global_eid_1 = F.tensor([0,1,2,3])
    global_nid_2 = F.tensor([0,1,2,4,5], F.int64)
    global_eid_2 = F.tensor([4,5,6], F.int64)

    ## so the partition policy is based on edge!!!
    g1 = dgl.DGLGraph()
    g2 = dgl.DGLGraph()
    g1.add_nodes(4)
    g2.add_nodes(5)
    g1.add_edges(0, 1) # 0
    g1.add_edges(0, 2) # 1
    g1.add_edges(0, 3) # 2
    g1.add_edges(2, 3) # 3
    g2.add_edges(1, 1) # 4
    g2.add_edges(0, 3) # 5
    g2.add_edges(2, 4) # 6
    g1.ndata[dgl.NID] = global_nid_1
    g1.edata[dgl.EID] = global_eid_1
    g2.ndata[dgl.NID] = global_nid_2
    g2.edata[dgl.EID] = global_eid_2
    # ShowGraph_without_label(g1, None, 5, 5, 'kv_store_testgraph_1.jpg')
    # ShowGraph_without_label(g2, None, 5, 5, 'kv_store_testgraph_2.jpg')

    ## wrong! only need one node/edge map
    gpb1 = dgl.distributed.graph_partition_book.BasicPartitionBook(part_id=0,
                                                                    num_parts=2,
                                                                    node_map=node_map,
                                                                    edge_map= edge_map,
                                                                    part_graph=g1)
    gpb2 = dgl.distributed.graph_partition_book.BasicPartitionBook(part_id=1,
                                                                num_parts=2,
                                                                node_map=node_map,
                                                                edge_map= edge_map,
                                                                part_graph=g2)
    node_policy_1 = dgl.distributed.PartitionPolicy(policy_str='node:_N',
                                              partition_book=gpb1)
    edge_policy_1 = dgl.distributed.PartitionPolicy(policy_str='edge:_E',
                                              partition_book=gpb1)
    node_policy_2 = dgl.distributed.PartitionPolicy(policy_str='node:_N',
                                              partition_book=gpb2)
    edge_policy_2 = dgl.distributed.PartitionPolicy(policy_str='edge:_E',
                                              partition_book=gpb2)

KVServer and KVClient

Server and Client

Server

def start_server(server_id, num_clients, num_servers):
    import time
    print("Sleep 5 seconds to test client re-connect")
    time.sleep(5)
    kvstore = dgl.distributed.KVServer(server_id = server_id,
                                        ip_config="kv_ip_config.txt",
                                        num_servers = num_servers,
                                        num_clients = num_clients)
    kvstore.add_part_policy(node_policy)
    kvstore.add_part_policy(edge_policy)
    if kvstore.is_backup_server():
        kvstore.init_data('data_0', 'node:_N')
        kvstore.init_data('data_0_1', 'node:_N')
        kvstore.init_data('data_0_2', 'node:_N')
        kvstore.init_data('data_0_3', 'node:_N')
    else: 
        kvstore.init_data('data_0', 'node:_N', data_0)
        kvstore.init_data('data_0_1', 'node:_N', data_0_1)
        kvstore.init_data('data_0_2', 'node:_N', data_0_2)
        kvstore.init_data('data_0_3', 'node:_N', data_0_3)
    server_state = dgl.distributed.ServerState(kv_store = kvstore, local_g=None, partition_book=None)
    dgl.distributed.start_server(server_id = server_id,
                                ip_config="kv_ip_config.txt",
                                num_servers = num_servers,
                                num_clients = num_clients,
                                server_state = server_state)

先来看dgl.distributed.KVServer的参数

  • server_id: 当前server的ID,可用于判定是否为backup_server
  • ip_config: ip+port,其中ip为一台machine,若一台machine需要几台server,port就会跳多少
# eg2: num_machine = 2, num_servers = N
# write: 
# 192.168.1.5 10000
# 192.168.1.5 10000+N
  • num_clients: 会有几台client连于本server
    • (所以每台server的clients都一样?)
  • num_server:每个ip对应的server数

此外,对于kvstore.init_data,若server不是backup_server,则将tensor copy到shared memory上,具体怎么做还不清楚,倒是中间数据好像转了一次dlpack。

  • server端的init_data的data_tensor到底有什么用?在后面似乎没有用到,因为在client端的push和pull操作中tensor均和id_tensor有关而这里没有id_tensor,怎么把data_tensor取出来?(已解决,看client端的分析)

server端的init_data只需要name,partition_policy(且为str格式),以及具体的tensor数值(且在backup server时可省略),这相对client端来说更加简单和轻松,等于说想初始化什么值就初始化什么值。

Client

def start_client(num_clients, num_servers):
    import os
    os.environ['DGL_DIST_MODE'] = 'distributed'
    dgl.distributed.initialize("kv_ip_config.txt")
    kvclient = dgl.distributed.KVClient("kv_ip_config.txt",
                                        num_servers = num_servers)
    kvclient.map_shared_data(partition_book=gpb)
    assert dgl.distributed.get_num_client()

initialize在此处其实等于调用了conncet_to_serverinit_roleinit_kvstore三个函数。
其他像KVClientmap_shared_data这些初始化的过程感觉不需要深入了解。
client端的函数还是很多的,包括对kv数据库的一些操作,接下来将详细介绍

init_data

client端的init_data相对server端要复杂一点

  • name:首先要定义这个数据在数据库中索引的名字
  • shape:数据的shape(tensor大小)
  • dtype:数据的类型
  • part_policy:这里也和server不一样,server只要具体str名,而这里需要明确的policy类,例如定义好的node_policyedge_policy
  • init_func:初始化函数,可自定义,e.g.
def init_zero_func(shape, dtype):
    return F.zeros(shape, dtype, F.cpu())

def init_two_func(shape,dtype):
    return F.ones(shape, dtype, F.cpu())*2

所以一个定义的栗子如下

    kvclient.init_data(name='data_1', 
                        shape = F.shape(data_1),
                        dtype = F.dtype(data_1),
                        part_policy=edge_policy,
                        init_func=init_zero_func)
    kvclient.init_data(name = 'data_2',
                        shape=F.shape(data_2),
                        dtype=F.dtype(data_2),
                        part_policy=node_policy,
                        init_func=init_two_func)

这样就把两个新的data索引加到数据库中,且具体数据与初始化函数有关,可以看到初始化过程只能使用自定义的函数,如果想任意初始化tensor的值,只能在server端进行初始化了

data_name_list

    name_list = kvclient.data_name_list()
    print(name_list)

把数据库中所有data索引的名字打出来,不管是哪个part_policy

get_data_meta

    meta = kvclient.get_data_meta('data_0')
    dtype, shape, policy = meta
    assert dtype == F.dtype(data_0)
    assert shape == F.shape(data_0)
    assert policy.policy_str == 'node:_N'

获取索引的meta数据,这里面没有具体tensor值只有dtype,shape和policy

push

push顾名思义就是把某个数据推到数据库存在的索引名中,但是需要注意的是一个索引底下的tensor的维度,因为在pushdata的时候需要指定将数据库中该tensor的哪几行给替换掉,这个指定的方式就是给一个id_tensor。按照图的处理需求来讲,tensor的shape一般为2D。

    id_tensor = F.tensor([0, 2, 4], F.int64)
    data_tensor = F.tensor([[7., 7.], [7., 7.], [7., 7.]], F.float32)
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor = data_tensor)
    kvclient.push(name='data_2',
                   id_tensor=id_tensor,

可以看到code中将索引data_0,data_1,data_2中的0,2,4行的tensor都替换成data_tensor中的对应0,1,2行的数据了

pull

pull跟push刚好相反,取数据

    id_tensor = F.tensor([1,4], F.int64)
    res = kvclient.pull(name='data_0',
                        id_tensor=id_tensor)
    print(res)
    res = kvclient.pull(name='data_2',
                        id_tensor=id_tensor)
    print(res)
    res = kvclient.pull(name='data_1',
                        id_tensor=id_tensor)
    print(res)

根据id_tensor取对应行的数据,这里关于id_tensor的事情,还是需要关注一下,如果tensor为2D,则直接取即可。但是若tensor为1D,要怎么取某个值呢,这里我试过若还是选1D的id-tensor的话,则会直接只输出一次全部1D tensor的值。若改成

    id_tensor = F.tensor([[1], [2], [4]], F.int64)
    res = kvclient.pull(name='data_0_1',
                        id_tensor=id_tensor)
    print('data_0_1', data_0_1)

即id_tensor为3维的话,则会报错

所以tensor的值还是取2D把。

  • 至于更高维的tensor会是什么情况,我还没有试过

register_push_handler

这个函数可以在给对应索引push数据时,对tensor_data进行处理,算是默认push函数的一个延伸。

def udf_push(target, name, id_tensor, data_tensor):
    target[name][id_tensor] *=  data_tensor
    
###client_func
    kvclient.register_push_handler('data_0', udf_push)
    kvclient.register_push_handler('data_1', udf_push)
    kvclient.register_push_handler('data_2', udf_push)
    id_tensor = F.tensor([0, 2, 4], F.int64)
    kvclient.push(name='data_0',
                  id_tensor=id_tensor,
                  data_tensor=data_tensor)
    kvclient.push(name='data_1',
                  id_tensor=id_tensor,
                  data_tensor = data_tensor)
    kvclient.push(name='data_2',
                   id_tensor=id_tensor,
                   data_tensor = data_tensor)
    kvclient.barrier()
    id_tensor = F.tensor([1,2,4], F.int64)
    res = kvclient.pull(name='data_0',
                        id_tensor=id_tensor)
    print(res)
    res = kvclient.pull(name='data_2',
                        id_tensor=id_tensor)
    print(res)
    res = kvclient.pull(name='data_1',
                        id_tensor=id_tensor)
    print(res)

可以看到这里的udf_push函数的作用是把tensor数据与新push的tensor进行element_wise的相乘,需要注意的是,每个client push一次就会乘一次,所以这里定义了两台client,也就是最后的tensor值是原始tensor值与push的tensor值相乘两次的结果。

barrier

因为出现了,所以也讲一下,感觉和数据库没啥太大关系,差不多就是让所有client在此处进行同步?

delete_data

    kvclient.delete_data('data_0')
    kvclient.delete_data('data_1')
    kvclient.delete_data('data_2')
    name_list = kvclient.data_name_list()
    print(name_list)

删掉一个索引及其tensor数据,删除后数据库中就不存在该索引名了

2Server and 2Client

  • 这种情况下,每个server都会连接到2个client上,因为server会print出2 Clients connected!—from rpc_server.start_server()
举报

相关推荐

0 条评论