graphlearn_torch.data

dataset

class Dataset(graph: Graph | Dict[Tuple[str, str, str], Graph] | None = None, node_features: Feature | Dict[str, Feature] | None = None, edge_features: Feature | Dict[Tuple[str, str, str], Feature] | None = None, node_labels: Tensor | ndarray | Dict[str, Tensor | ndarray] | None = None)[source]

Bases: object

A dataset manager for all graph topology and feature data.

init_graph(edge_index: Tensor | ndarray | Dict[Tuple[str, str, str], Tensor | ndarray] | None = None, edge_ids: Tensor | ndarray | Dict[Tuple[str, str, str], Tensor | ndarray] | None = None, layout: str | Dict[Tuple[str, str, str], str] = 'COO', graph_mode: str = 'ZERO_COPY', directed: bool = False, device: int | None = None)[source]

Initialize the graph storage and build the object of Graph.

Parameters:
  • edge_index (torch.Tensor or numpy.ndarray) – Edge index for graph topo, 2D CPU tensor/numpy.ndarray(homo). A dict should be provided for heterogenous graph. (default: None)

  • edge_ids (torch.Tensor or numpy.ndarray) – Edge ids for graph edges, A CPU tensor (homo) or a Dict[EdgeType, torch.Tensor](hetero). (default: None)

  • layout (str) – The edge layout representation for the input edge index, should be ‘COO’, ‘CSR’ or ‘CSC’. (default: ‘COO’)

  • graph_mode (str) – Mode in graphlearn_torch’s Graph, ‘CPU’, ‘ZERO_COPY’ or ‘CUDA’. (default: ‘ZERO_COPY’)

  • directed (bool) – A Boolean value indicating whether the graph topology is directed. (default: False)

  • device (torch.device) – The target cuda device rank used for graph operations when graph mode is not “CPU”. (default: None)

init_node_features(node_feature_data: Tensor | ndarray | Dict[str, Tensor | ndarray] | None = None, id2idx: Tensor | ndarray | Dict[str, Tensor | ndarray] | None = None, sort_func=None, split_ratio: float | Dict[str, float] = 0.0, device_group_list: List[DeviceGroup] | None = None, device: int | None = None, with_gpu: bool = True, dtype: dtype | None = None)[source]

Initialize the node feature storage.

Parameters:
  • node_feature_data (torch.Tensor or numpy.ndarray) – A tensor of the raw node feature data, should be a dict for heterogenous graph nodes. (default: None)

  • id2idx (torch.Tensor or numpy.ndarray) – A tensor that maps node id to index, should be a dict for heterogenous graph nodes. (default: None)

  • sort_func – Function for reordering node features. Currently, only features of homogeneous nodes are supported to reorder. (default: None)

  • split_ratio (float) – The proportion (between 0 and 1) of node feature data allocated to the GPU, should be a dict for heterogenous graph nodes. (default: 0.0)

  • device_group_list (List[DeviceGroup]) – A list of device groups used for node feature lookups, the GPU part of feature data will be replicated on each device group in this list during the initialization. GPUs with peer-to-peer access to each other should be set in the same device group properly. (default: None)

  • device (torch.device) – The target cuda device rank used for node feature lookups when the GPU part is not None.. (default: None)

  • with_gpu (bool) – A Boolean value indicating whether the Feature uses UnifiedTensor. If True, it means Feature consists of UnifiedTensor, otherwise Feature is PyTorch CPU Tensor and split_ratio, device_group_list and device will be invliad. (default: True)

  • dtype (torch.dtype) – The data type of node feature elements, if not specified, it will be automatically inferred. (Default: None).

init_edge_features(edge_feature_data: Tensor | ndarray | Dict[Tuple[str, str, str], Tensor | ndarray] | None = None, id2idx: Tensor | ndarray | Dict[Tuple[str, str, str], Tensor | ndarray] | None = None, split_ratio: float | Dict[Tuple[str, str, str], float] = 0.0, device_group_list: List[DeviceGroup] | None = None, device: int | None = None, with_gpu: bool = True, dtype: dtype | None = None)[source]

Initialize the edge feature storage.

Parameters:
  • edge_feature_data (torch.Tensor or numpy.ndarray) – A tensor of the raw edge feature data, should be a dict for heterogenous graph edges. (default: None)

  • id2idx (torch.Tensor or numpy.ndarray) – A tensor that maps edge id to index, should be a dict for heterogenous graph edges. (default: None)

  • split_ratio (float) – The proportion (between 0 and 1) of edge feature data allocated to the GPU, should be a dict for heterogenous graph edges. (default: 0.0)

  • device_group_list (List[DeviceGroup]) – A list of device groups used for edge feature lookups, the GPU part of feature data will be replicated on each device group in this list during the initialization. GPUs with peer-to-peer access to each other should be set in the same device group properly. (default: None)

  • device (torch.device) – The target cuda device rank used for edge feature lookups when the GPU part is not None.. (default: None)

  • with_gpu (bool) – A Boolean value indicating whether the Feature uses UnifiedTensor. If True, it means Feature consists of UnifiedTensor, otherwise Feature is PyTorch CPU Tensor and split_ratio, device_group_list and device will be invliad. (default: True)

  • dtype (torch.dtype) – The data type of edge feature elements, if not specified, it will be automatically inferred. (Default: None).

init_node_labels(node_label_data: Tensor | ndarray | Dict[str, Tensor | ndarray] | None = None)[source]

Initialize the node label storage.

Parameters:

node_label_data (torch.Tensor or numpy.ndarray) – A tensor of the raw node label data, should be a dict for heterogenous graph nodes. (default: None)

share_ipc()[source]
classmethod from_ipc_handle(ipc_handle)[source]
get_graph(etype: Tuple[str, str, str] | None = None)[source]
get_node_types()[source]
get_edge_types()[source]
get_node_feature(ntype: str | None = None)[source]
get_edge_feature(etype: Tuple[str, str, str] | None = None)[source]
get_node_label(ntype: str | None = None)[source]
rebuild_dataset(ipc_handle)[source]
reduce_dataset(dataset: Dataset)[source]

feature

class DeviceGroup(group_id: int, device_list: List[device])[source]

Bases: object

A group of GPUs with peer-to-peer access (NVLinks) to each other.

Parameters:
  • group_id – An integer representing the rank of the device group.

  • device_list – A list of devices that can be accessed p2p.

property size
class Feature(feature_tensor: Tensor | ndarray, id2index: Tensor | None = None, split_ratio: float = 0.0, device_group_list: List[DeviceGroup] | None = None, device: int | None = None, with_gpu: bool | None = True, dtype: dtype = torch.float32)[source]

Bases: object

A class for feature storage and lookup with hardware topology awareness and high performance.

According to split_ratio, Feature splits the feature data into the GPU part and the CPU part(ZERO-COPY), and the GPU part is replicated between all device groups in the input device group list. Each GPU can p2p access data on other GPUs in the same DeviceGroup it belongs to, and can access data on CPU part with zero copy.

Parameters:
  • feature_tensor (torch.Tensor or numpy.ndarray) – A CPU tensor of the raw feature data.

  • id2index (torch.Tensor, optional) – index in the raw cpu feature tensor. If the feature data in the input feature_tensor are not consecutive and ordered by node ids, this parameter should be provided. (Default: None).

  • split_ratio (float) – The proportion of feature data allocated to the GPU, between 0 and 1. (Default: 0.0).

  • device_group_list (List[DeviceGroup], optional) – A list of device groups used for feature lookups, the GPU part of feature data will be replicated on each device group in this list during the initialization. GPUs with peer-to-peer access to each other should be set in the same device group properly. Note that this parameter will be ignored if the split_ratio set to zero. If set to None, the GPU part will be replicated on all available GPUs got by torch.cuda.device_count(), and each GPU device is an independent group. (Default: None).

  • device (int, optional) – The target cuda device rank to perform feature lookups with the GPU part on the current Feature instance. The value of torch.cuda.current_device() will be used if set to None``(Default: ``None).

  • with_gpu (bool) – A Boolean value indicating whether the Feature uses UnifiedTensor. If True, it means Feature consists of UnifiedTensor, otherwise Feature is PyTorch CPU Tensor and split_ratio, device_group_list and device will be invliad. (Default: True).

  • dtype (torch.dtype) – The data type of feature elements. (Default: torch.float32).

Example

>>> feat_tensor, id2index = sort_by_in_degree(feat_tensor, csr_topo)
>>> # suppose you have 8 GPUs.
>>> # if there is no NVLink.
>>> device_groups = [DeviceGroup(i, [i]) for i in range(8)]
>>> # if there are NVLinks between GPU0-3 and GPU4-7.
>>> device_groups = [DeviceGroup(0, [0,1,2,3]), DeviceGroup(1, [4,5,6,7])]
>>> # Split the cpu feature tensor, of which the GPU part accounts for 60%.
>>> # Launch the GPU kernel on device 0 for this ``Feature`` instance.
>>> feature = Feature(feat_tensor, id2index, 0.6, device_groups, 0)
>>> out = feature[input]
TODO(baole): Support to automatically find suitable GPU groups. For now,

you can use nvidia-smi topo -m to find the right groups.

cpu_get(ids: Tensor)[source]

Perform feature lookups only with CPU feature tensor.

share_ipc()[source]

Create ipc handle for multiprocessing.

classmethod from_ipc_handle(ipc_handle)[source]
lazy_init_with_ipc_handle()[source]
property shape
size(dim)[source]
rebuild_feature(ipc_handle)[source]
reduce_feature(feature: Feature)[source]

graph

class CSRTopo(edge_index: Tensor | ndarray | Tuple[Tensor | ndarray, Tensor | ndarray], edge_ids: Tensor | ndarray | None = None, layout: str = 'COO')[source]

Bases: object

Graph topology in CSR format.

Parameters:
  • edge_index (a 2D torch.Tensor or numpy.ndarray, or a tuple) – The edge index for graph topology.

  • edge_ids (torch.Tensor or numpy.ndarray, optional) – The edge ids for graph edges. If set to None, it will be aranged by the edge size. (default: None)

  • layout (str) – The edge layout representation for the input edge index, should be ‘COO’ (rows and cols uncompressed), ‘CSR’ (rows compressed) or ‘CSC’ (columns compressed). (default: ‘COO’)

to_coo() Tuple[Tensor, Tensor, Tensor][source]

Convert to COO format.

Returns:

row indice tensor, column indice tensor and edge id tensor

to_csc() Tuple[Tensor, Tensor, Tensor][source]

Convert to CSC format.

Returns:

row indice tensor, column ptr tensor and edge id tensor

property indptr
property indices
property edge_ids

local edge ids in CSR.

property degrees
property row_count
property edge_count
share_memory_()[source]
class Graph(csr_topo: CSRTopo, mode='ZERO_COPY', device: int | None = None)[source]

Bases: object

A graph object used for graph operations such as sampling.

There are three modes supported:
1.’CPU’: graph data are stored in the CPU memory and graph

operations are also executed on CPU.

2.’ZERO_COPY’: graph data are stored in the pinned CPU memory and graph

operations are executed on GPU.

3.’CUDA’: graph data are stored in the GPU memory and graph operations

are executed on GPU.

Parameters:
  • csr_topo (CSRTopo) – An instance of CSRTopo with graph topology data.

  • mode (str) – The graph operation mode, must be ‘CPU’, ‘ZERO_COPY’ or ‘CUDA’. (Default: ‘ZERO_COPY’).

  • device (int, optional) – The target cuda device rank to perform graph operations. Note that this parameter will be ignored if the graph mode set to ‘CPU’. The value of torch.cuda.current_device() will be used if set to None. (Default: None).

lazy_init()[source]
share_ipc()[source]

Create ipc handle for multiprocessing.

Returns:

A tuple of csr_topo and graph mode.

classmethod from_ipc_handle(ipc_handle)[source]

Create from ipc handle.

property row_count
property col_count
property edge_count
property graph_handler

Get a pointer to the underlying graph object for graph operations such as sampling.

rebuild_graph(ipc_handle)[source]
reduce_graph(graph: Graph)[source]

reorder

sort_by_in_degree(cpu_tensor, shuffle_ratio, csr_topo)[source]

table_dataset

class TableDataset(edge_tables=None, node_tables=None, graph_mode='ZERO_COPY', sort_func=None, split_ratio=0.0, device_group_list=None, directed=True, reader_threads=10, reader_capacity=10240, reader_batch_size=1024, label=None, device=None, **kwargs)[source]

Bases: Dataset

unified_tensor

class UnifiedTensor(current_device: int, dtype: dtype = torch.float32)[source]

Bases: object

Creates a CPU and GPUs unified Tensor for GPU direct access. For the tensor stored in the CPU memory, we use ZERO-COPY to provide efficient GPU access. For tensors stored in the GPU memory, p2p access between GPUs(such as NVLink) is required.

Parameters:
  • current_device (int) – An integer to represent the GPU device where the underlying cuda operation kernel is launched.

  • dtype (torch.dtype) – The data type of the tensor elements.

append_shared_tensor(shared_tensor)[source]

Append from SharedTensor.

Parameters:

shared_tensor – A pywrap.SharedTensor object which means GPU tensor that can be shared with other GPUs.

append_cpu_tensor(cpu_tensor: Tensor)[source]

Append from CPU tensor.

Parameters:

cpu_tensor – A CPU torch.Tensor object which will be stored in pinned memory for ZERO-COPY.

init_from(tensors: List[Tensor], tensor_devices: List[int])[source]

Initialize from CPU torch.Tensors.

Parameters:
  • tensors – CPU torch.Tensors indicating the tensors that need to be stored on different GPUs and CPU.

  • tensor_devices – The indices of devices indicating the location of the tensor storage, -1 means on CPU and other > 0 value means on GPUs.

  • other. (Note that tensors and tensor_devices must correspond to each) –

property shape
property device
property numel
size(dim)[source]
stride(dim)[source]
share_ipc()[source]

Shares ipc handles.

Returns:

A list of cuda ipcs and cpu part tensor.

from_ipc_handle(cuda_ipc_list, cpu_part)[source]

Builds from ipc handle.

Parameters:
  • cuda_ipc_list – A list of CUDA ipcs, in the same order as tensors_devices.

  • cpu_part – A CPU torch.Tensor.

classmethod new_from_ipc(ipc_handles, current_device: int, dtype: dtype)[source]

Creates UnifiedTensor from ipc handles.

Parameters:
  • ipc_handles – ipc handles consists of CUDA ipcs and cpu part torch.Tensor.

  • current_device (int) – An integer to represent the GPU device where the underlying cuda operation kernel is launched.

  • dtype (torch.dtype) – The data type of the tensor elements.

Returns:

A UnifiedTensor instance.

vineyard_utils

vineyard_to_csr(sock, fid, elid=0, vlid=0, haseid=0)[source]

Wrap to_csr function to read graph from vineyard with return (indptr, indices, (Optional)edge_id)

load_vertex_feature_from_vineyard(sock, fid, vcols, vlid=0, dtype='float32')[source]

Wrap load_vertex_feature_from_vineyard function to read vertex feature from vineyard dtype: torch data type return vertex_feature(torch.Tensor)

load_edge_feature_from_vineyard(sock, fid, ecols, elid=0, dtype='float32')[source]

Wrap load_edge_feature_from_vineyard function to read edge feature from vineyard dtype: torch data type return edge_feature(torch.Tensor)