Source code for graphlearn_torch.distributed.dist_graph

# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

from typing import Dict, Optional, Union

import torch

from ..data import Graph
from ..typing import (
  NodeType, EdgeType, PartitionBook,
  HeteroNodePartitionDict, HeteroEdgePartitionDict
)


[docs]class DistGraph(object): r""" Simple wrapper for graph data with distributed context. TODO: support graph operations. Args: num_partitions: Number of data partitions. partition_id: Data partition idx of current process. local_graph: local `Graph` instance. node_pb: Partition book which records vertex ids to worker node ids. edge_pb: Partition book which records edge ids to worker node ids. Note that`local_graph`, `node_pb` and `edge_pb` should be a dictionary for hetero data. """ def __init__(self, num_partitions: int, partition_idx: int, local_graph: Union[Graph, Dict[EdgeType, Graph]], node_pb: Union[PartitionBook, HeteroNodePartitionDict], edge_pb: Union[PartitionBook, HeteroEdgePartitionDict]): self.num_partitions = num_partitions self.partition_idx = partition_idx self.local_graph = local_graph if isinstance(self.local_graph, dict): self.data_cls = 'hetero' for _, graph in self.local_graph.items(): graph.lazy_init() elif isinstance(self.local_graph, Graph): self.data_cls = 'homo' self.local_graph.lazy_init() else: raise ValueError(f"'{self.__class__.__name__}': found invalid input " f"graph type '{type(self.local_graph)}'") self.node_pb = node_pb if self.node_pb is not None: if isinstance(self.node_pb, dict): assert self.data_cls == 'hetero' elif isinstance(self.node_pb, PartitionBook): assert self.data_cls == 'homo' else: raise ValueError(f"'{self.__class__.__name__}': found invalid input " f"node patition book type '{type(self.node_pb)}'") self.edge_pb = edge_pb if self.edge_pb is not None: if isinstance(self.edge_pb, dict): assert self.data_cls == 'hetero' elif isinstance(self.edge_pb, PartitionBook): assert self.data_cls == 'homo' else: raise ValueError(f"'{self.__class__.__name__}': found invalid input " f"edge patition book type '{type(self.edge_pb)}'")
[docs] def get_local_graph(self, etype: Optional[EdgeType]=None): r""" Get a `Graph` obj of a specific edge type. """ if self.data_cls == 'hetero': assert etype is not None return self.local_graph[etype] return self.local_graph
[docs] def get_node_partitions(self, ids: torch.Tensor, ntype: Optional[NodeType]=None): r""" Get the partition ids of node ids with a specific node type. """ if self.data_cls == 'hetero': assert ntype is not None return self.node_pb[ntype][ids] return self.node_pb[ids]
[docs] def get_edge_partitions(self, eids: torch.Tensor, etype: Optional[EdgeType]=None): r""" Get the partition ids of edge ids with a specific edge type. """ if self.data_cls == 'hetero': assert etype is not None return self.edge_pb[etype][eids] return self.edge_pb[eids]