# Copyright 2022 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eithPer express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
from multiprocessing.reduction import ForkingPickler
from typing import Dict, List, Optional, Union
import torch
from ..data import Dataset, Graph, Feature, DeviceGroup
from ..partition import load_partition, cat_feature_cache
from ..typing import (
NodeType, EdgeType, TensorDataType,
PartitionBook, HeteroNodePartitionDict, HeteroEdgePartitionDict
)
from ..utils import share_memory
[docs]class DistDataset(Dataset):
r""" Graph and feature dataset with distributed partition info.
"""
def __init__(
self,
num_partitions: int = 1,
partition_idx: int = 0,
graph_partition: Union[Graph, Dict[EdgeType, Graph]] = None,
node_feature_partition: Union[Feature, Dict[NodeType, Feature]] = None,
edge_feature_partition: Union[Feature, Dict[EdgeType, Feature]] = None,
whole_node_labels: Union[TensorDataType, Dict[NodeType, TensorDataType]] = None,
node_pb: Union[PartitionBook, HeteroNodePartitionDict] = None,
edge_pb: Union[PartitionBook, HeteroEdgePartitionDict] = None,
node_feat_pb: Union[PartitionBook, HeteroNodePartitionDict] = None,
edge_feat_pb: Union[PartitionBook, HeteroEdgePartitionDict] = None,
):
super().__init__(
graph_partition,
node_feature_partition,
edge_feature_partition,
whole_node_labels
)
self.num_partitions = num_partitions
self.partition_idx = partition_idx
self.node_pb = node_pb
self.edge_pb = edge_pb
# As the loaded feature partition may be concatenated with its cached
# features and the partition book for features will be modified, thus we
# need to distinguish them with the original graph partition books.
#
# If the `node_feat_pb` or `edge_feat_pb` is not provided, the `node_pb`
# or `edge_pb` will be used instead for feature lookups.
self._node_feat_pb = node_feat_pb
self._edge_feat_pb = edge_feat_pb
if self.graph is not None:
assert self.node_pb is not None
if self.node_features is not None:
assert self.node_pb is not None or self._node_feat_pb is not None
if self.edge_features is not None:
assert self.edge_pb is not None or self._edge_feat_pb is not None
[docs] def load(
self,
root_dir: str,
partition_idx: int,
graph_mode: str = 'ZERO_COPY',
feature_with_gpu: bool = True,
device_group_list: Optional[List[DeviceGroup]] = None,
whole_node_label_file: Union[str, Dict[NodeType, str]] = None,
device: Optional[int] = None
):
r""" Load a certain dataset partition from partitioned files and create
in-memory objects (``Graph``, ``Feature`` or ``torch.Tensor``).
Args:
root_dir (str): The directory path to load the graph and feature
partition data.
partition_idx (int): Partition idx to load.
graph_mode (str): Mode for creating graphlearn_torch's `Graph`, including
'CPU', 'ZERO_COPY' or 'CUDA'. (default: 'ZERO_COPY')
feature_with_gpu (bool): A Boolean value indicating whether the created
``Feature`` objects of node/edge features use ``UnifiedTensor``.
If True, it means ``Feature`` consists of ``UnifiedTensor``, otherwise
``Feature`` is a PyTorch CPU Tensor, the ``device_group_list`` and
``device`` will be invliad. (default: ``True``)
device_group_list (List[DeviceGroup], optional): A list of device groups
used for feature lookups, the GPU part of feature data will be
replicated on each device group in this list during the initialization.
GPUs with peer-to-peer access to each other should be set in the same
device group properly. (default: ``None``)
whole_node_label_file (str): The path to the whole node labels which are
not partitioned. (default: ``None``)
device: The target cuda device rank used for graph operations when graph
mode is not "CPU" and feature lookups when the GPU part is not None.
(default: ``None``)
"""
(
self.num_partitions,
self.partition_idx,
graph_data,
node_feat_data,
edge_feat_data,
self.node_pb,
self.edge_pb
) = load_partition(root_dir, partition_idx)
# init graph partition
if isinstance(graph_data, dict):
# heterogeneous.
edge_index, edge_ids = {}, {}
for k, v in graph_data.items():
edge_index[k] = v.edge_index
edge_ids[k] = v.eids
else:
# homogeneous.
edge_index = graph_data.edge_index
edge_ids = graph_data.eids
self.init_graph(edge_index, edge_ids, layout='COO',
graph_mode=graph_mode, device=device)
# load node feature partition
if node_feat_data is not None:
node_cache_ratio, node_feat, node_feat_id2idx, node_feat_pb = \
_cat_feature_cache(partition_idx, node_feat_data, self.node_pb)
self.init_node_features(
node_feat, node_feat_id2idx, None, node_cache_ratio,
device_group_list, device, feature_with_gpu, dtype=None
)
self._node_feat_pb = node_feat_pb
# load edge feature partition
if edge_feat_data is not None:
edge_cache_ratio, edge_feat, edge_feat_id2idx, edge_feat_pb = \
_cat_feature_cache(partition_idx, edge_feat_data, self.edge_pb)
self.init_edge_features(
edge_feat, edge_feat_id2idx, edge_cache_ratio,
device_group_list, device, feature_with_gpu, dtype=None
)
self._edge_feat_pb = edge_feat_pb
# load whole node labels
if whole_node_label_file is not None:
if isinstance(whole_node_label_file, dict):
whole_node_labels = {}
for ntype, file in whole_node_label_file.items():
whole_node_labels[ntype] = torch.load(file)
else:
whole_node_labels = torch.load(whole_node_label_file)
self.init_node_labels(whole_node_labels)
[docs] def share_ipc(self):
super().share_ipc()
self.node_pb = share_memory(self.node_pb)
self.edge_pb = share_memory(self.edge_pb)
self._node_feat_pb = share_memory(self._node_feat_pb)
self._edge_feat_pb = share_memory(self._edge_feat_pb)
ipc_hanlde = (
self.num_partitions, self.partition_idx,
self.graph, self.node_features, self.edge_features, self.node_labels,
self.node_pb, self.edge_pb, self._node_feat_pb, self._edge_feat_pb
)
return ipc_hanlde
[docs] @classmethod
def from_ipc_handle(cls, ipc_handle):
return cls(*ipc_handle)
@property
def node_feat_pb(self):
if self._node_feat_pb is None:
return self.node_pb
return self._node_feat_pb
@property
def edge_feat_pb(self):
if self._edge_feat_pb is None:
return self.edge_pb
return self._edge_feat_pb
def _cat_feature_cache(partition_idx, raw_feat_data, raw_feat_pb):
r""" Cat a feature partition with its cached features.
"""
if isinstance(raw_feat_data, dict):
# heterogeneous.
cache_ratio, feat_data, feat_id2idx, feat_pb = {}, {}, {}, {}
for graph_type, raw_feat in raw_feat_data.items():
cache_ratio[graph_type], feat_data[graph_type], \
feat_id2idx[graph_type], feat_pb[graph_type] = \
cat_feature_cache(partition_idx, raw_feat, raw_feat_pb[graph_type])
else:
# homogeneous.
cache_ratio, feat_data, feat_id2idx, feat_pb = \
cat_feature_cache(partition_idx, raw_feat_data, raw_feat_pb)
return cache_ratio, feat_data, feat_id2idx, feat_pb
## Pickling Registration
[docs]def rebuild_dist_dataset(ipc_handle):
ds = DistDataset.from_ipc_handle(ipc_handle)
return ds
[docs]def reduce_dist_dataset(dataset: DistDataset):
ipc_handle = dataset.share_ipc()
return (rebuild_dist_dataset, (ipc_handle, ))
ForkingPickler.register(DistDataset, reduce_dist_dataset)