graphlearn_torch.distributed
dist_client
- init_client(num_servers: int, num_clients: int, client_rank: int, master_addr: str, master_port: int, num_rpc_threads: int = 4, client_group_name: str | None = None)[source]
Initialize the current process as a client and establish connections with all other servers and clients. Note that this method should be called only in the server-client distribution mode.
- Parameters:
num_servers (int) – Number of processes participating in the server group.
num_clients (int) – Number of processes participating in the client group.
client_rank (int) – Rank of the current process withing the client group (it should be a number between 0 and
num_clients-1).master_addr (str) – The master TCP address for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.
master_port (int) – The master TCP port for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.
num_rpc_threads (int) – The number of RPC worker threads used for the current client. (Default:
4).client_group_name (str) – A unique name of the client group that current process belongs to. If set to
None, a default name will be used. (Default:None).
- shutdown_client()[source]
Shutdown the client on the current process, notify other servers to exit, and destroy all connections.
dist_context
- class DistRole(value)[source]
Bases:
EnumRole types for distributed context groups.
- WORKER = 1
- SERVER = 2
- CLIENT = 3
- class DistContext(role: DistRole, group_name: str, world_size: int, rank: int, global_world_size: int, global_rank: int)[source]
Bases:
objectDistributed context info of the current process.
- Parameters:
role (DistRole) – The role type of the current context group.
group_name (str) – A unique name of the current role group.
world_size (int) – The number of processes in the current role group.
rank (int) – The current process rank within the current role group.
global_world_size (int) – The total number of processes in all role groups.
global_rank (int) – The current process rank within all role groups.
- property worker_name: str
Get worker name of the current process of this context.
- get_context() DistContext[source]
Get distributed context info of the current process.
- init_worker_group(world_size: int, rank: int, group_name: str | None = None)[source]
Initialize a simple worker group on the current process, this method should be called only in a non-server distribution mode with a group of parallel workers.
- Parameters:
world_size (int) – Number of all processes participating in the distributed worker group.
rank (int) – Rank of the current process withing the distributed group (it should be a number between 0 and
world_size-1).group_name (str) – A unique name of the distributed group that current process belongs to. If set to
None, a default name will be used.
dist_dataset
- class DistDataset(num_partitions: int = 1, partition_idx: int = 0, graph_partition: Graph | Dict[Tuple[str, str, str], Graph] | None = None, node_feature_partition: Feature | Dict[str, Feature] | None = None, edge_feature_partition: Feature | Dict[Tuple[str, str, str], Feature] | None = None, whole_node_labels: Tensor | ndarray | Dict[str, Tensor | ndarray] | None = None, node_pb: Tensor | Dict[str, Tensor] | None = None, edge_pb: Tensor | Dict[Tuple[str, str, str], Tensor] | None = None, node_feat_pb: Tensor | Dict[str, Tensor] | None = None, edge_feat_pb: Tensor | Dict[Tuple[str, str, str], Tensor] | None = None)[source]
Bases:
DatasetGraph and feature dataset with distributed partition info.
- load(root_dir: str, partition_idx: int, graph_mode: str = 'ZERO_COPY', feature_with_gpu: bool = True, device_group_list: List[DeviceGroup] | None = None, whole_node_label_file: str | Dict[str, str] | None = None, device: int | None = None)[source]
Load a certain dataset partition from partitioned files and create in-memory objects (
Graph,Featureortorch.Tensor).- Parameters:
root_dir (str) – The directory path to load the graph and feature partition data.
partition_idx (int) – Partition idx to load.
graph_mode (str) – Mode for creating graphlearn_torch’s Graph, including ‘CPU’, ‘ZERO_COPY’ or ‘CUDA’. (default: ‘ZERO_COPY’)
feature_with_gpu (bool) – A Boolean value indicating whether the created
Featureobjects of node/edge features useUnifiedTensor. If True, it meansFeatureconsists ofUnifiedTensor, otherwiseFeatureis a PyTorch CPU Tensor, thedevice_group_listanddevicewill be invliad. (default:True)device_group_list (List[DeviceGroup], optional) – A list of device groups used for feature lookups, the GPU part of feature data will be replicated on each device group in this list during the initialization. GPUs with peer-to-peer access to each other should be set in the same device group properly. (default:
None)whole_node_label_file (str) – The path to the whole node labels which are not partitioned. (default:
None)device – The target cuda device rank used for graph operations when graph mode is not “CPU” and feature lookups when the GPU part is not None. (default:
None)
- property node_feat_pb
- property edge_feat_pb
- reduce_dist_dataset(dataset: DistDataset)[source]
dist_feature
- class RpcFeatureLookupCallee(dist_feature)[source]
Bases:
RpcCalleeBaseA wrapper for rpc callee that will perform feature lookup from remote processes.
- class DistFeature(num_partitions: int, partition_idx: int, local_feature: Feature | Dict[str | Tuple[str, str, str], Feature], feature_pb: Tensor | Dict[str, Tensor] | Dict[Tuple[str, str, str], Tensor], local_only: bool = False, rpc_router: RpcDataPartitionRouter | None = None, device: device | None = None)[source]
Bases:
objectDistributed feature data manager for global feature lookups.
- Parameters:
num_partitions – Number of data partitions.
partition_id – Data partition idx of current process.
local_feature – Local
Featureinstance.feature_pb – Partition book which records node/edge ids to worker node ids mapping on feature store.
local_only – Use this instance only for local feature lookup or stitching. If set to
True, the related rpc callee will not be registered and users should ensure that lookups for remote features are not invoked through this instance. Default toFalse.device – Device used for computing. Default to
None.
Note that`local_feature` and feature_pb should be a dictionary for hetero data.
dist_graph
- class DistGraph(num_partitions: int, partition_idx: int, local_graph: Graph | Dict[Tuple[str, str, str], Graph], node_pb: Tensor | Dict[str, Tensor], edge_pb: Tensor | Dict[Tuple[str, str, str], Tensor])[source]
Bases:
objectSimple wrapper for graph data with distributed context.
TODO: support graph operations.
- Parameters:
num_partitions – Number of data partitions.
partition_id – Data partition idx of current process.
local_graph – local Graph instance.
node_pb – Partition book which records vertex ids to worker node ids.
edge_pb – Partition book which records edge ids to worker node ids.
Note that`local_graph`, node_pb and edge_pb should be a dictionary for hetero data.
- get_local_graph(etype: Tuple[str, str, str] | None = None)[source]
Get a Graph obj of a specific edge type.
dist_neighbor_loader
- class DistNeighborLoader(data: DistDataset | None, num_neighbors: List[int] | Dict[Tuple[str, str, str], List[int]], input_nodes: Tensor | str | Tuple[str, Tensor], batch_size: int = 1, shuffle: bool = False, drop_last: bool = False, with_edge: bool = False, collect_features: bool = False, to_device: device | None = None, worker_options: CollocatedDistSamplingWorkerOptions | MpDistSamplingWorkerOptions | RemoteDistSamplingWorkerOptions | None = None)[source]
Bases:
DistLoaderA distributed loader that preform sampling from nodes.
- Parameters:
data (DistDataset, optional) – The
DistDatasetobject of a partition of graph data and feature data, along with distributed patition books. The input dataset must be provided in non-server distribution mode.num_neighbors (List[int] or Dict[Tuple[str, str, str], List[int]]) – The number of neighbors to sample for each node in each iteration. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type.
input_nodes (torch.Tensor or Tuple[str, torch.Tensor]) – The node seeds for which neighbors are sampled to create mini-batches. In heterogeneous graphs, needs to be passed as a tuple that holds the node type and node seeds.
batch_size (int) – How many samples per batch to load (default:
1).shuffle (bool) – Set to
Trueto have the data reshuffled at every epoch (default:False).drop_last (bool) – Set to
Trueto drop the last incomplete batch, if the dataset size is not divisible by the batch size. IfFalseand the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default:False).with_edge (bool) – Set to
Trueto sample with edge ids and also include them in the sampled results. (default:False).collect_features (bool) – Set to
Trueto collect features for nodes of each sampled subgraph. (default:False).to_device (torch.device, optional) – The target device that the sampled results should be copied to. If set to
None, the current cuda device (got bytorch.cuda.current_device) will be used if available, otherwise, the cpu device will be used. (default:None).worker_options (optional) – The options for launching sampling workers. (1) If set to
Noneor provided with aCollocatedDistWorkerOptionsobject, a single collocated sampler will be launched on the current process, while the separate sampling mode will be disabled . (2) If provided with aMpDistWorkerOptionsobject, the sampling workers will be launched on spawned subprocesses, and a share-memory based channel will be created for sample message passing from multiprocessing workers to the current loader. (3) If provided with aRemoteDistWorkerOptionsobject, the sampling workers will be launched on remote sampling server nodes, and a remote channel will be created for cross-machine message passing. (default:None).
dist_neighbor_sampler
- class PartialNeighborOutput(index: Tensor, output: NeighborOutput)[source]
Bases:
objectThe sampled neighbor output of a subset of the original ids.
index: the index of the subset vertex ids.
output: the sampled neighbor output.
- index: Tensor
- output: NeighborOutput
- class RpcSamplingCallee(sampler: NeighborSampler, device: device)[source]
Bases:
RpcCalleeBaseA wrapper for rpc callee that will perform rpc sampling from remote processes.
- class RpcSubGraphCallee(sampler: NeighborSampler, device: device)[source]
Bases:
RpcCalleeBaseA wrapper for rpc callee that will perform rpc sampling from remote processes.
- class DistNeighborSampler(data: DistDataset, num_neighbors: List[int] | Dict[Tuple[str, str, str], List[int]] | None = None, with_edge: bool = False, with_neg: bool = False, collect_features: bool = False, channel: ChannelBase | None = None, concurrency: int = 1, device: device | None = None)[source]
Bases:
ConcurrentEventLoopAsynchronized and distributed neighbor sampler.
- Parameters:
data (DistDataset) – The graph and feature data with partition info.
num_neighbors (NumNeighbors) – The number of sampling neighbors on each hop.
with_edge (bool) – Whether to sample with edge ids. (default:
None).collect_features (bool) – Whether collect features for sampled results. (default:
None).channel (ChannelBase, optional) – The message channel to send sampled results. If set to None, the sampled results will be returned directly with sample_from_nodes. (default:
None).concurrency (int) – The max number of concurrent seed batches processed by the current sampler. (default:
1).device – The device to use for sampling. If set to
None, the current cuda device (got bytorch.cuda.current_device) will be used if available, otherwise, the cpu device will be used. (default:None).
- sample_from_nodes(inputs: NodeSamplerInput, **kwargs) Dict[str, Tensor] | None[source]
Sample multi-hop neighbors from nodes, collect the remote features (optional), and send results to the output channel.
Note that if the output sample channel is specified, this func is asynchronized and the sampled result will not be returned directly. Otherwise, this func will be blocked to wait for the sampled result and return it.
- Parameters:
inputs (NodeSamplerInput) – The input data with node indices to start sampling from.
- sample_from_edges(inputs: EdgeSamplerInput, **kwargs) Dict[str, Tensor] | None[source]
Sample multi-hop neighbors from edges, collect the remote features (optional), and send results to the output channel.
Note that if the output sample channel is specified, this func is asynchronized and the sampled result will not be returned directly. Otherwise, this func will be blocked to wait for the sampled result and return it.
- Parameters:
inputs (EdgeSamplerInput) – The input data for sampling from edges including the (1) source node indices, the (2) destination node indices, the (3) optional edge labels and the (4) input edge type.
- subgraph(inputs: NodeSamplerInput, **kwargs) Dict[str, Tensor] | None[source]
Induce an enclosing subgraph based on inputs and their neighbors(if self.num_neighbors is not None).
dist_options
- class CollocatedDistSamplingWorkerOptions(master_addr: str | None = None, master_port: int | str | None = None, num_rpc_threads: int | None = None, rpc_timeout: float = 180)[source]
Bases:
_BasicDistSamplingWorkerOptionsOptions for launching a single distributed sampling worker collocated with the current process.
- Parameters:
master_addr (str, optional) – Master address for rpc initialization across all sampling workers. (default:
None).master_port (str or int, optional) – Master port for rpc initialization across all sampling workers. (default:
None).num_rpc_threads (int, optional) – Number of threads used for rpc agent on each sampling worker. (default:
None).rpc_timeout (float) – The timeout in seconds for rpc requests. (default:
180).
Please ref to
_BasicDistSamplingWorkerOptionsfor more detailed comments of related input arguments.
- class MpDistSamplingWorkerOptions(num_workers: int = 1, worker_devices: List[device] | None = None, worker_concurrency: int = 4, master_addr: str | None = None, master_port: int | str | None = None, num_rpc_threads: int | None = None, rpc_timeout: float = 180, channel_size: int | str | None = None, pin_memory: bool = False)[source]
Bases:
_BasicDistSamplingWorkerOptionsOptions for launching distributed sampling workers with multiprocessing.
Note that if
MpDistWorkerOptionsis used, all sampling workers will be launched on spawned subprocesses bytorch.multiprocessing. Thus, a share-memory based channel should be created for message passing of sampled results, which are produced by those multiprocessing sampling workers and consumed by the current process.- Parameters:
num_workers (int) – How many workers to use (subprocesses to spwan) for distributed neighbor sampling of the current process. (default:
1).worker_devices (torch.device or List[torch.device], optional) – List of devices assgined to workers of this group. (default:
None).worker_concurrency (int) – The max sampling concurrency for each sampling worker. (default:
4).master_addr (str, optional) – Master address for rpc initialization across all sampling workers. (default:
None).master_port (str or int, optional) – Master port for rpc initialization across all sampling workers. (default:
None).num_rpc_threads (int, optional) – Number of threads used for rpc agent on each sampling worker. (default:
None).rpc_timeout (float) – The timeout in seconds for rpc requests. (default:
180).channel_size (int or str) – The shared-memory buffer size (bytes) allocated for the channel. The number of
num_workers * 64MBwill be used if set toNone. (default:None).pin_memory (bool) – Set to
Trueto register the underlying shared memory for cuda, which will achieve better performance if you want to copy loaded data from channel to cuda device. (default:False).
Please ref to
_BasicDistSamplingWorkerOptionsfor more detailed comments of related input arguments.
- class RemoteDistSamplingWorkerOptions(server_rank: int | None = None, num_workers: int = 1, worker_devices: List[device] | None = None, worker_concurrency: int = 4, master_addr: str | None = None, master_port: int | str | None = None, num_rpc_threads: int | None = None, rpc_timeout: float = 180, buffer_size: int | str | None = None, prefetch_size: int = 4)[source]
Bases:
_BasicDistSamplingWorkerOptionsOptions for launching distributed sampling workers on remote servers.
Note that if
RemoteDistSamplingWorkerOptionsis used, all sampling workers will be launched on remote servers. Thus, a cross-machine based channel will be created for message passing of sampled results, which are produced by those remote sampling workers and consumed by the current process.- Parameters:
server_rank (int) – The rank of server to launch sampling workers. If set to
None, it will be automatically assigned. (default:None).num_workers (int) – How many workers to launch on the remote server for distributed neighbor sampling of the current process. (default:
1).worker_devices (torch.device or List[torch.device], optional) – List of devices assgined to workers of this group. (default:
None).worker_concurrency (int) – The max sampling concurrency for each sampling worker. (default:
4).master_addr (str, optional) – Master address for rpc initialization across all sampling workers. (default:
None).master_port (str or int, optional) – Master port for rpc initialization across all sampling workers. (default:
None).num_rpc_threads (int, optional) – Number of threads used for rpc agent on each sampling worker. (default:
None).rpc_timeout (float) – The timeout in seconds for rpc requests. (default:
180).buffer_size (int or str) – The size (bytes) allocated for the server-side buffer. The number of
num_workers * 64MBwill be used if set toNone. (default:None).prefetch_size (int) – The max prefetched sampled messages for consuming on the client side. (default:
4).
dist_sampling_producer
- MP_STATUS_CHECK_INTERVAL = 5.0
Interval (in seconds) to check status of processes to avoid hanging in multiprocessing sampling.
- class MpCommand(value)[source]
Bases:
EnumEnum class for multiprocessing sampling command
- SAMPLE_ALL = 0
- STOP = 1
- class DistMpSamplingProducer(data: DistDataset, sampler_input: NodeSamplerInput | EdgeSamplerInput, sampling_config: SamplingConfig, worker_options: _BasicDistSamplingWorkerOptions, output_channel: ChannelBase)[source]
Bases:
objectA subprocess group of distributed sampling workers.
Note that this producer is only used for workload with separate sampling and training, all sampled results will be sent to the output channel.
- class DistCollocatedSamplingProducer(data: DistDataset, sampler_input: NodeSamplerInput | EdgeSamplerInput, sampling_config: SamplingConfig, worker_options: _BasicDistSamplingWorkerOptions, device: device)[source]
Bases:
objectA sampling producer with a collocated distributed sampler.
Note that the sampled results will be returned directly and this producer will be blocking when processing each batch.
dist_server
- SERVER_EXIT_STATUS_CHECK_INTERVAL = 5.0
Interval (in seconds) to check exit status of server.
- class DistServer(dataset: DistDataset)[source]
Bases:
objectA server that supports launching remote sampling workers for training clients.
Note that this server is enabled only when the distribution mode is a server-client framework, and the graph and feature store will be partitioned and managed by all server nodes.
- Parameters:
dataset (DistDataset) – The
DistDatasetobject of a partition of graph data and feature data, along with distributed patition books.
- get_dataset_meta()[source]
Get the meta info of the distributed dataset managed by the current server, including partition info and graph types.
- create_sampling_producer(sampler_input: NodeSamplerInput | EdgeSamplerInput, sampling_config: SamplingConfig, worker_options: RemoteDistSamplingWorkerOptions) int[source]
Create and initialize an instance of
DistSamplingProducerwith a group of subprocesses for distributed sampling.- Parameters:
sampler_input (NodeSamplerInput or EdgeSamplerInput) – The input data for sampling.
sampling_config (SamplingConfig) – Configuration of sampling meta info.
worker_options (RemoteDistSamplingWorkerOptions) – Options for launching remote sampling workers by this server.
- Returns:
A unique id of created sampling producer on this server.
- destroy_sampling_producer(producer_id: int)[source]
Shutdown and destroy a sampling producer managed by this server with its producer id.
- get_server() DistServer[source]
Get the
DistServerinstance on the current process.
- init_server(num_servers: int, num_clients: int, server_rank: int, dataset: DistDataset, master_addr: str, master_port: int, num_rpc_threads: int = 16, request_timeout: int = 180, server_group_name: str | None = None)[source]
Initialize the current process as a server and establish connections with all other servers and clients. Note that this method should be called only in the server-client distribution mode.
- Parameters:
num_servers (int) – Number of processes participating in the server group.
num_clients (int) – Number of processes participating in the client group.
server_rank (int) – Rank of the current process withing the server group (it should be a number between 0 and
num_servers-1).dataset (DistDataset) – The
DistDatasetobject of a partition of graph data and feature data, along with distributed patition book info.master_addr (str) – The master TCP address for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.
master_port (int) – The master TCP port for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.
num_rpc_threads (int) – The number of RPC worker threads used for the current server to respond remote requests. (Default:
16).request_timeout (int) – The max timeout seconds for remote requests, otherwise an exception will be raised. (Default:
16).server_group_name (str) – A unique name of the server group that current process belongs to. If set to
None, a default name will be used. (Default:None).
event_loop
- class ConcurrentEventLoop(concurrency)[source]
Bases:
objectConcurrent event loop context.
- Parameters:
concurrency – max processing concurrency.
- add_task(coro, callback=None)[source]
Add an asynchronized coroutine task to run.
- Parameters:
coro – the async coroutine func.
callback – the callback func applied on the returned results after the coroutine task is finished.
Note that any results returned by callback func will be ignored, so it is preferable to handle all in your callback func and do not return any results.
rpc
- get_rpc_current_group_worker_names() List[str][source]
Get the rpc worker names (sorted by rank) of the current group.
- get_rpc_worker_names() Dict[DistRole, List[str]][source]
Get the rpc worker names (each sorted by rank) of each current group.
- all_gather(obj, timeout=None)[source]
Gathers objects only from the current role group in a list. This function blocks until all workers in the current role group have received the gathered results. The implementation of this methid is refer to
torch.distributed.rpc.api._all_gather.
- barrier(timeout=None)[source]
Block until all local and remote RPC processes in the current role group reach this method.
- global_all_gather(obj, timeout=None)[source]
Gathers objects from all role groups in a list, using the implementation of
torch.distributed.rpc.api._all_gather.
- global_barrier(timeout=None)[source]
Block until all local and remote RPC processes across all role groups reach this method.
- init_rpc(master_addr: str, master_port: int, num_rpc_threads: int = 16, rpc_timeout: float = 180)[source]
Initialize rpc on the current process.
- shutdown_rpc(graceful=True)[source]
Shutdown rpc agent on the current process.
If graceful set to False, other mechanisms should ensure that all rpc requests are completed before shutting down rpc servers.
- class RpcDataPartitionRouter(partition2workers: List[List[str]])[source]
Bases:
objectA router that can select a remote rpc worker with a certain data partition to perform a rpc request.
- rpc_sync_data_partitions(num_data_partitions: int, current_partition_idx: int)[source]
Synchronize the data partition info across all workers only in the current role group.
Note that all data should be partitioned and used with a single role group.
- Parameters:
num_data_partitions (int) – The number of all data partitions.
current_partition_idx (int) – The data partition idx that the current process is responsible for, some compution tasks on this data partition may be send to the current process from remote workers.
- class RpcCalleeBase[source]
Bases:
ABCA wrapper base for rpc callee that will perform rpc requests from remote processes.
Note that the callee will be called only from rpc workers in the current role group.
- rpc_register(callee: RpcCalleeBase)[source]
Register a callee for rpc requests only in the current role group, this method will block until all local and remote RPC processes of the current role group reach this method.
- rpc_request_async(worker_name, callee_id, args=None, kwargs=None)[source]
Perform a rpc request asynchronously within the current role group. and return a future.
- rpc_request(worker_name, callee_id, args=None, kwargs=None)[source]
Perform a rpc request synchronously within the current role group and return the results.