# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eithPer express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import logging
from multiprocessing.reduction import ForkingPickler
from typing import Dict, List, Optional, Union
import torch
from ..typing import NodeType, EdgeType, TensorDataType
from ..utils import convert_to_tensor, share_memory, squeeze
from .feature import DeviceGroup, Feature
from .graph import CSRTopo, Graph
[docs]class Dataset(object):
r""" A dataset manager for all graph topology and feature data.
"""
def __init__(
self,
graph: Union[Graph, Dict[EdgeType, Graph]] = None,
node_features: Union[Feature, Dict[NodeType, Feature]] = None,
edge_features: Union[Feature, Dict[EdgeType, Feature]] = None,
node_labels: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None
):
self.graph = graph
self.node_features = node_features
self.edge_features = edge_features
self.node_labels = squeeze(convert_to_tensor(node_labels))
[docs] def init_graph(
self,
edge_index: Union[TensorDataType, Dict[EdgeType, TensorDataType]] = None,
edge_ids: Union[TensorDataType, Dict[EdgeType, TensorDataType]] = None,
layout: Union[str, Dict[EdgeType, str]] = 'COO',
graph_mode: str = 'ZERO_COPY',
directed: bool = False,
device: Optional[int] = None
):
r""" Initialize the graph storage and build the object of `Graph`.
Args:
edge_index (torch.Tensor or numpy.ndarray): Edge index for graph topo,
2D CPU tensor/numpy.ndarray(homo). A dict should be provided for
heterogenous graph. (default: ``None``)
edge_ids (torch.Tensor or numpy.ndarray): Edge ids for graph edges, A
CPU tensor (homo) or a Dict[EdgeType, torch.Tensor](hetero).
(default: ``None``)
layout (str): The edge layout representation for the input edge index,
should be 'COO', 'CSR' or 'CSC'. (default: 'COO')
graph_mode (str): Mode in graphlearn_torch's ``Graph``, 'CPU', 'ZERO_COPY'
or 'CUDA'. (default: 'ZERO_COPY')
directed (bool): A Boolean value indicating whether the graph topology is
directed. (default: ``False``)
device (torch.device): The target cuda device rank used for graph
operations when graph mode is not "CPU". (default: ``None``)
"""
edge_index = convert_to_tensor(edge_index, dtype=torch.int64)
edge_ids = convert_to_tensor(edge_ids, dtype=torch.int64)
self._directed = directed
if edge_index is not None:
if isinstance(edge_index, dict):
# heterogeneous.
if edge_ids is not None:
assert isinstance(edge_ids, dict)
else:
edge_ids = {}
if not isinstance(layout, dict):
layout = {etype: layout for etype in edge_index.keys()}
csr_topo_dict = {}
for etype, e_idx in edge_index.items():
csr_topo_dict[etype] = CSRTopo(
edge_index=e_idx,
edge_ids=edge_ids.get(etype, None),
layout=layout[etype]
)
self.graph = {}
for etype, csr_topo in csr_topo_dict.items():
g = Graph(csr_topo, graph_mode, device)
g.lazy_init()
self.graph[etype] = g
else:
# homogeneous.
csr_topo = CSRTopo(edge_index, edge_ids, layout)
self.graph = Graph(csr_topo, graph_mode, device)
self.graph.lazy_init()
[docs] def init_node_features(
self,
node_feature_data: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None,
id2idx: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None,
sort_func = None,
split_ratio: Union[float, Dict[NodeType, float]] = 0.0,
device_group_list: Optional[List[DeviceGroup]] = None,
device: Optional[int] = None,
with_gpu: bool = True,
dtype: Optional[torch.dtype] = None
):
r""" Initialize the node feature storage.
Args:
node_feature_data (torch.Tensor or numpy.ndarray): A tensor of the raw
node feature data, should be a dict for heterogenous graph nodes.
(default: ``None``)
id2idx (torch.Tensor or numpy.ndarray): A tensor that maps node id to
index, should be a dict for heterogenous graph nodes.
(default: ``None``)
sort_func: Function for reordering node features. Currently, only features
of homogeneous nodes are supported to reorder. (default: ``None``)
split_ratio (float): The proportion (between 0 and 1) of node feature data
allocated to the GPU, should be a dict for heterogenous graph nodes.
(default: ``0.0``)
device_group_list (List[DeviceGroup]): A list of device groups used for
node feature lookups, the GPU part of feature data will be replicated on
each device group in this list during the initialization. GPUs with
peer-to-peer access to each other should be set in the same device
group properly. (default: ``None``)
device (torch.device): The target cuda device rank used for node feature
lookups when the GPU part is not None.. (default: `None`)
with_gpu (bool): A Boolean value indicating whether the ``Feature`` uses
``UnifiedTensor``. If True, it means ``Feature`` consists of
``UnifiedTensor``, otherwise ``Feature`` is PyTorch CPU Tensor and
``split_ratio``, ``device_group_list`` and ``device`` will be invliad.
(default: ``True``)
dtype (torch.dtype): The data type of node feature elements, if not
specified, it will be automatically inferred. (Default: ``None``).
"""
if node_feature_data is not None:
node_feature_data = convert_to_tensor(node_feature_data, dtype)
id2idx = convert_to_tensor(id2idx)
if id2idx is None and sort_func is not None:
if isinstance(node_feature_data, dict):
logging.warning("'%s': reordering heterogenous graph node features "
"is not supported now.", self.__class__.__name__)
elif self.graph is not None:
# reorder node features of homogeneous graph.
assert isinstance(self.graph, Graph)
if self._directed is None or not self._directed:
csr_topo_rev = self.graph.csr_topo
else:
row, col, eids = self.graph.csr_topo.to_coo()
csr_topo_rev = CSRTopo((col, row), eids, layout='COO')
node_feature_data, id2idx = \
sort_func(node_feature_data, split_ratio, csr_topo_rev)
self.node_features = _build_features(
node_feature_data, id2idx, split_ratio,
device_group_list, device, with_gpu, dtype
)
[docs] def init_edge_features(
self,
edge_feature_data: Union[TensorDataType, Dict[EdgeType, TensorDataType]] = None,
id2idx: Union[TensorDataType, Dict[EdgeType, TensorDataType]] = None,
split_ratio: Union[float, Dict[EdgeType, float]] = 0.0,
device_group_list: Optional[List[DeviceGroup]] = None,
device: Optional[int] = None,
with_gpu: bool = True,
dtype: Optional[torch.dtype] = None
):
r""" Initialize the edge feature storage.
Args:
edge_feature_data (torch.Tensor or numpy.ndarray): A tensor of the raw
edge feature data, should be a dict for heterogenous graph edges.
(default: ``None``)
id2idx (torch.Tensor or numpy.ndarray): A tensor that maps edge id to
index, should be a dict for heterogenous graph edges.
(default: ``None``)
split_ratio (float): The proportion (between 0 and 1) of edge feature data
allocated to the GPU, should be a dict for heterogenous graph edges.
(default: ``0.0``)
device_group_list (List[DeviceGroup]): A list of device groups used for
edge feature lookups, the GPU part of feature data will be replicated on
each device group in this list during the initialization. GPUs with
peer-to-peer access to each other should be set in the same device
group properly. (default: ``None``)
device (torch.device): The target cuda device rank used for edge feature
lookups when the GPU part is not None.. (default: `None`)
with_gpu (bool): A Boolean value indicating whether the ``Feature`` uses
``UnifiedTensor``. If True, it means ``Feature`` consists of
``UnifiedTensor``, otherwise ``Feature`` is PyTorch CPU Tensor and
``split_ratio``, ``device_group_list`` and ``device`` will be invliad.
(default: ``True``)
dtype (torch.dtype): The data type of edge feature elements, if not
specified, it will be automatically inferred. (Default: ``None``).
"""
if edge_feature_data is not None:
self.edge_features = _build_features(
convert_to_tensor(edge_feature_data, dtype), convert_to_tensor(id2idx),
split_ratio, device_group_list, device, with_gpu, dtype
)
[docs] def init_node_labels(
self,
node_label_data: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None
):
r""" Initialize the node label storage.
Args:
node_label_data (torch.Tensor or numpy.ndarray): A tensor of the raw
node label data, should be a dict for heterogenous graph nodes.
(default: ``None``)
"""
if node_label_data is not None:
self.node_labels = squeeze(convert_to_tensor(node_label_data))
[docs] def share_ipc(self):
self.node_labels = share_memory(self.node_labels)
return self.graph, self.node_features, self.edge_features, self.node_labels
[docs] @classmethod
def from_ipc_handle(cls, ipc_handle):
graph, node_features, edge_features, node_labels = ipc_handle
return cls(graph, node_features, edge_features, node_labels)
[docs] def get_graph(self, etype: Optional[EdgeType] = None):
if isinstance(self.graph, Graph):
return self.graph
if isinstance(self.graph, dict):
assert etype is not None
return self.graph.get(etype, None)
return None
[docs] def get_node_types(self):
if isinstance(self.graph, dict):
if not hasattr(self, '_node_types'):
ntypes = set()
for etype in self.graph.keys():
ntypes.add(etype[0])
ntypes.add(etype[2])
self._node_types = list(ntypes)
return self._node_types
return None
[docs] def get_edge_types(self):
if isinstance(self.graph, dict):
if not hasattr(self, '_edge_types'):
self._edge_types = list(self.graph.keys())
return self._edge_types
return None
[docs] def get_node_feature(self, ntype: Optional[NodeType] = None):
if isinstance(self.node_features, Feature):
return self.node_features
if isinstance(self.node_features, dict):
assert ntype is not None
return self.node_features.get(ntype, None)
return None
[docs] def get_edge_feature(self, etype: Optional[EdgeType] = None):
if isinstance(self.edge_features, Feature):
return self.edge_features
if isinstance(self.edge_features, dict):
assert etype is not None
return self.edge_features.get(etype, None)
return None
[docs] def get_node_label(self, ntype: Optional[NodeType] = None):
if isinstance(self.node_labels, torch.Tensor):
return self.node_labels
if isinstance(self.node_labels, dict):
assert ntype is not None
return self.node_labels.get(ntype, None)
return None
def __getitem__(self, key):
return getattr(self, key, None)
def __setitem__(self, key, value):
setattr(self, key, value)
def _build_features(feature_data, id2idx, split_ratio,
device_group_list, device, with_gpu, dtype):
r""" Build `Feature`s for node/edge feature data.
"""
if feature_data is not None:
if isinstance(feature_data, dict):
# heterogeneous.
if not isinstance(split_ratio, dict):
split_ratio = {
graph_type: float(split_ratio)
for graph_type in feature_data.keys()
}
if id2idx is not None:
assert isinstance(id2idx, dict)
else:
id2idx = {}
features = {}
for graph_type, feat in feature_data.items():
features[graph_type] = Feature(
feat, id2idx.get(graph_type, None),
split_ratio.get(graph_type, 0.0),
device_group_list, device, with_gpu,
dtype if dtype is not None else feat.dtype
)
else:
# homogeneous.
features = Feature(
feature_data, id2idx, float(split_ratio),
device_group_list, device, with_gpu,
dtype if dtype is not None else feature_data.dtype
)
else:
features = None
return features
## Pickling Registration
[docs]def rebuild_dataset(ipc_handle):
ds = Dataset.from_ipc_handle(ipc_handle)
return ds
[docs]def reduce_dataset(dataset: Dataset):
ipc_handle = dataset.share_ipc()
return (rebuild_dataset, (ipc_handle, ))
ForkingPickler.register(Dataset, reduce_dataset)