graphlearn_torch.distributed

dist_client

init_client(num_servers: int, num_clients: int, client_rank: int, master_addr: str, master_port: int, num_rpc_threads: int = 4, client_group_name: str | None = None)[source]

Initialize the current process as a client and establish connections with all other servers and clients. Note that this method should be called only in the server-client distribution mode.

Parameters:
  • num_servers (int) – Number of processes participating in the server group.

  • num_clients (int) – Number of processes participating in the client group.

  • client_rank (int) – Rank of the current process withing the client group (it should be a number between 0 and num_clients-1).

  • master_addr (str) – The master TCP address for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.

  • master_port (int) – The master TCP port for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.

  • num_rpc_threads (int) – The number of RPC worker threads used for the current client. (Default: 4).

  • client_group_name (str) – A unique name of the client group that current process belongs to. If set to None, a default name will be used. (Default: None).

shutdown_client()[source]

Shutdown the client on the current process, notify other servers to exit, and destroy all connections.

async_request_server(server_rank: int, func, *args, **kwargs)[source]

The entry to perform an asynchronous request on a remote server, calling on the client side.

request_server(server_rank: int, func, *args, **kwargs)[source]

The entry to perform a synchronous request on a remote server, calling on the client side.

dist_context

class DistRole(value)[source]

Bases: Enum

Role types for distributed context groups.

WORKER = 1
SERVER = 2
CLIENT = 3
class DistContext(role: DistRole, group_name: str, world_size: int, rank: int, global_world_size: int, global_rank: int)[source]

Bases: object

Distributed context info of the current process.

Parameters:
  • role (DistRole) – The role type of the current context group.

  • group_name (str) – A unique name of the current role group.

  • world_size (int) – The number of processes in the current role group.

  • rank (int) – The current process rank within the current role group.

  • global_world_size (int) – The total number of processes in all role groups.

  • global_rank (int) – The current process rank within all role groups.

is_worker() bool[source]
is_server() bool[source]
is_client() bool[source]
num_servers() int[source]
num_clients() int[source]
property worker_name: str

Get worker name of the current process of this context.

get_context() DistContext[source]

Get distributed context info of the current process.

init_worker_group(world_size: int, rank: int, group_name: str | None = None)[source]

Initialize a simple worker group on the current process, this method should be called only in a non-server distribution mode with a group of parallel workers.

Parameters:
  • world_size (int) – Number of all processes participating in the distributed worker group.

  • rank (int) – Rank of the current process withing the distributed group (it should be a number between 0 and world_size-1).

  • group_name (str) – A unique name of the distributed group that current process belongs to. If set to None, a default name will be used.

dist_dataset

class DistDataset(num_partitions: int = 1, partition_idx: int = 0, graph_partition: Graph | Dict[Tuple[str, str, str], Graph] | None = None, node_feature_partition: Feature | Dict[str, Feature] | None = None, edge_feature_partition: Feature | Dict[Tuple[str, str, str], Feature] | None = None, whole_node_labels: Tensor | ndarray | Dict[str, Tensor | ndarray] | None = None, node_pb: Tensor | Dict[str, Tensor] | None = None, edge_pb: Tensor | Dict[Tuple[str, str, str], Tensor] | None = None, node_feat_pb: Tensor | Dict[str, Tensor] | None = None, edge_feat_pb: Tensor | Dict[Tuple[str, str, str], Tensor] | None = None)[source]

Bases: Dataset

Graph and feature dataset with distributed partition info.

load(root_dir: str, partition_idx: int, graph_mode: str = 'ZERO_COPY', feature_with_gpu: bool = True, device_group_list: List[DeviceGroup] | None = None, whole_node_label_file: str | Dict[str, str] | None = None, device: int | None = None)[source]

Load a certain dataset partition from partitioned files and create in-memory objects (Graph, Feature or torch.Tensor).

Parameters:
  • root_dir (str) – The directory path to load the graph and feature partition data.

  • partition_idx (int) – Partition idx to load.

  • graph_mode (str) – Mode for creating graphlearn_torch’s Graph, including ‘CPU’, ‘ZERO_COPY’ or ‘CUDA’. (default: ‘ZERO_COPY’)

  • feature_with_gpu (bool) – A Boolean value indicating whether the created Feature objects of node/edge features use UnifiedTensor. If True, it means Feature consists of UnifiedTensor, otherwise Feature is a PyTorch CPU Tensor, the device_group_list and device will be invliad. (default: True)

  • device_group_list (List[DeviceGroup], optional) – A list of device groups used for feature lookups, the GPU part of feature data will be replicated on each device group in this list during the initialization. GPUs with peer-to-peer access to each other should be set in the same device group properly. (default: None)

  • whole_node_label_file (str) – The path to the whole node labels which are not partitioned. (default: None)

  • device – The target cuda device rank used for graph operations when graph mode is not “CPU” and feature lookups when the GPU part is not None. (default: None)

share_ipc()[source]
classmethod from_ipc_handle(ipc_handle)[source]
property node_feat_pb
property edge_feat_pb
rebuild_dist_dataset(ipc_handle)[source]
reduce_dist_dataset(dataset: DistDataset)[source]

dist_feature

class RpcFeatureLookupCallee(dist_feature)[source]

Bases: RpcCalleeBase

A wrapper for rpc callee that will perform feature lookup from remote processes.

call(*args, **kwargs)[source]

The real processing entry for rpc requests, need to be overwrite.

class DistFeature(num_partitions: int, partition_idx: int, local_feature: Feature | Dict[str | Tuple[str, str, str], Feature], feature_pb: Tensor | Dict[str, Tensor] | Dict[Tuple[str, str, str], Tensor], local_only: bool = False, rpc_router: RpcDataPartitionRouter | None = None, device: device | None = None)[source]

Bases: object

Distributed feature data manager for global feature lookups.

Parameters:
  • num_partitions – Number of data partitions.

  • partition_id – Data partition idx of current process.

  • local_feature – Local Feature instance.

  • feature_pb – Partition book which records node/edge ids to worker node ids mapping on feature store.

  • local_only – Use this instance only for local feature lookup or stitching. If set to True, the related rpc callee will not be registered and users should ensure that lookups for remote features are not invoked through this instance. Default to False.

  • device – Device used for computing. Default to None.

Note that`local_feature` and feature_pb should be a dictionary for hetero data.

local_get(ids: Tensor, input_type: str | Tuple[str, str, str] | None = None) Tensor[source]

Lookup features in the local feature store, the input node/edge ids should be guaranteed to be all local to the current feature store.

async_get(ids: Tensor, input_type: str | Tuple[str, str, str] | None = None) Future[source]

Lookup features asynchronously and return a future.

dist_graph

class DistGraph(num_partitions: int, partition_idx: int, local_graph: Graph | Dict[Tuple[str, str, str], Graph], node_pb: Tensor | Dict[str, Tensor], edge_pb: Tensor | Dict[Tuple[str, str, str], Tensor])[source]

Bases: object

Simple wrapper for graph data with distributed context.

TODO: support graph operations.

Parameters:
  • num_partitions – Number of data partitions.

  • partition_id – Data partition idx of current process.

  • local_graph – local Graph instance.

  • node_pb – Partition book which records vertex ids to worker node ids.

  • edge_pb – Partition book which records edge ids to worker node ids.

Note that`local_graph`, node_pb and edge_pb should be a dictionary for hetero data.

get_local_graph(etype: Tuple[str, str, str] | None = None)[source]

Get a Graph obj of a specific edge type.

get_node_partitions(ids: Tensor, ntype: str | None = None)[source]

Get the partition ids of node ids with a specific node type.

get_edge_partitions(eids: Tensor, etype: Tuple[str, str, str] | None = None)[source]

Get the partition ids of edge ids with a specific edge type.

dist_neighbor_loader

class DistNeighborLoader(data: DistDataset | None, num_neighbors: List[int] | Dict[Tuple[str, str, str], List[int]], input_nodes: Tensor | str | Tuple[str, Tensor], batch_size: int = 1, shuffle: bool = False, drop_last: bool = False, with_edge: bool = False, collect_features: bool = False, to_device: device | None = None, worker_options: CollocatedDistSamplingWorkerOptions | MpDistSamplingWorkerOptions | RemoteDistSamplingWorkerOptions | None = None)[source]

Bases: DistLoader

A distributed loader that preform sampling from nodes.

Parameters:
  • data (DistDataset, optional) – The DistDataset object of a partition of graph data and feature data, along with distributed patition books. The input dataset must be provided in non-server distribution mode.

  • num_neighbors (List[int] or Dict[Tuple[str, str, str], List[int]]) – The number of neighbors to sample for each node in each iteration. In heterogeneous graphs, may also take in a dictionary denoting the amount of neighbors to sample for each individual edge type.

  • input_nodes (torch.Tensor or Tuple[str, torch.Tensor]) – The node seeds for which neighbors are sampled to create mini-batches. In heterogeneous graphs, needs to be passed as a tuple that holds the node type and node seeds.

  • batch_size (int) – How many samples per batch to load (default: 1).

  • shuffle (bool) – Set to True to have the data reshuffled at every epoch (default: False).

  • drop_last (bool) – Set to True to drop the last incomplete batch, if the dataset size is not divisible by the batch size. If False and the size of dataset is not divisible by the batch size, then the last batch will be smaller. (default: False).

  • with_edge (bool) – Set to True to sample with edge ids and also include them in the sampled results. (default: False).

  • collect_features (bool) – Set to True to collect features for nodes of each sampled subgraph. (default: False).

  • to_device (torch.device, optional) – The target device that the sampled results should be copied to. If set to None, the current cuda device (got by torch.cuda.current_device) will be used if available, otherwise, the cpu device will be used. (default: None).

  • worker_options (optional) – The options for launching sampling workers. (1) If set to None or provided with a CollocatedDistWorkerOptions object, a single collocated sampler will be launched on the current process, while the separate sampling mode will be disabled . (2) If provided with a MpDistWorkerOptions object, the sampling workers will be launched on spawned subprocesses, and a share-memory based channel will be created for sample message passing from multiprocessing workers to the current loader. (3) If provided with a RemoteDistWorkerOptions object, the sampling workers will be launched on remote sampling server nodes, and a remote channel will be created for cross-machine message passing. (default: None).

dist_neighbor_sampler

class PartialNeighborOutput(index: Tensor, output: NeighborOutput)[source]

Bases: object

The sampled neighbor output of a subset of the original ids.

  • index: the index of the subset vertex ids.

  • output: the sampled neighbor output.

index: Tensor
output: NeighborOutput
class RpcSamplingCallee(sampler: NeighborSampler, device: device)[source]

Bases: RpcCalleeBase

A wrapper for rpc callee that will perform rpc sampling from remote processes.

call(*args, **kwargs)[source]

The real processing entry for rpc requests, need to be overwrite.

class RpcSubGraphCallee(sampler: NeighborSampler, device: device)[source]

Bases: RpcCalleeBase

A wrapper for rpc callee that will perform rpc sampling from remote processes.

call(*args, **kwargs)[source]

The real processing entry for rpc requests, need to be overwrite.

class DistNeighborSampler(data: DistDataset, num_neighbors: List[int] | Dict[Tuple[str, str, str], List[int]] | None = None, with_edge: bool = False, with_neg: bool = False, collect_features: bool = False, channel: ChannelBase | None = None, concurrency: int = 1, device: device | None = None)[source]

Bases: ConcurrentEventLoop

Asynchronized and distributed neighbor sampler.

Parameters:
  • data (DistDataset) – The graph and feature data with partition info.

  • num_neighbors (NumNeighbors) – The number of sampling neighbors on each hop.

  • with_edge (bool) – Whether to sample with edge ids. (default: None).

  • collect_features (bool) – Whether collect features for sampled results. (default: None).

  • channel (ChannelBase, optional) – The message channel to send sampled results. If set to None, the sampled results will be returned directly with sample_from_nodes. (default: None).

  • concurrency (int) – The max number of concurrent seed batches processed by the current sampler. (default: 1).

  • device – The device to use for sampling. If set to None, the current cuda device (got by torch.cuda.current_device) will be used if available, otherwise, the cpu device will be used. (default: None).

sample_from_nodes(inputs: NodeSamplerInput, **kwargs) Dict[str, Tensor] | None[source]

Sample multi-hop neighbors from nodes, collect the remote features (optional), and send results to the output channel.

Note that if the output sample channel is specified, this func is asynchronized and the sampled result will not be returned directly. Otherwise, this func will be blocked to wait for the sampled result and return it.

Parameters:

inputs (NodeSamplerInput) – The input data with node indices to start sampling from.

sample_from_edges(inputs: EdgeSamplerInput, **kwargs) Dict[str, Tensor] | None[source]

Sample multi-hop neighbors from edges, collect the remote features (optional), and send results to the output channel.

Note that if the output sample channel is specified, this func is asynchronized and the sampled result will not be returned directly. Otherwise, this func will be blocked to wait for the sampled result and return it.

Parameters:

inputs (EdgeSamplerInput) – The input data for sampling from edges including the (1) source node indices, the (2) destination node indices, the (3) optional edge labels and the (4) input edge type.

subgraph(inputs: NodeSamplerInput, **kwargs) Dict[str, Tensor] | None[source]

Induce an enclosing subgraph based on inputs and their neighbors(if self.num_neighbors is not None).

dist_options

class CollocatedDistSamplingWorkerOptions(master_addr: str | None = None, master_port: int | str | None = None, num_rpc_threads: int | None = None, rpc_timeout: float = 180)[source]

Bases: _BasicDistSamplingWorkerOptions

Options for launching a single distributed sampling worker collocated with the current process.

Parameters:
  • master_addr (str, optional) – Master address for rpc initialization across all sampling workers. (default: None).

  • master_port (str or int, optional) – Master port for rpc initialization across all sampling workers. (default: None).

  • num_rpc_threads (int, optional) – Number of threads used for rpc agent on each sampling worker. (default: None).

  • rpc_timeout (float) – The timeout in seconds for rpc requests. (default: 180).

Please ref to _BasicDistSamplingWorkerOptions for more detailed comments of related input arguments.

class MpDistSamplingWorkerOptions(num_workers: int = 1, worker_devices: List[device] | None = None, worker_concurrency: int = 4, master_addr: str | None = None, master_port: int | str | None = None, num_rpc_threads: int | None = None, rpc_timeout: float = 180, channel_size: int | str | None = None, pin_memory: bool = False)[source]

Bases: _BasicDistSamplingWorkerOptions

Options for launching distributed sampling workers with multiprocessing.

Note that if MpDistWorkerOptions is used, all sampling workers will be launched on spawned subprocesses by torch.multiprocessing. Thus, a share-memory based channel should be created for message passing of sampled results, which are produced by those multiprocessing sampling workers and consumed by the current process.

Parameters:
  • num_workers (int) – How many workers to use (subprocesses to spwan) for distributed neighbor sampling of the current process. (default: 1).

  • worker_devices (torch.device or List[torch.device], optional) – List of devices assgined to workers of this group. (default: None).

  • worker_concurrency (int) – The max sampling concurrency for each sampling worker. (default: 4).

  • master_addr (str, optional) – Master address for rpc initialization across all sampling workers. (default: None).

  • master_port (str or int, optional) – Master port for rpc initialization across all sampling workers. (default: None).

  • num_rpc_threads (int, optional) – Number of threads used for rpc agent on each sampling worker. (default: None).

  • rpc_timeout (float) – The timeout in seconds for rpc requests. (default: 180).

  • channel_size (int or str) – The shared-memory buffer size (bytes) allocated for the channel. The number of num_workers * 64MB will be used if set to None. (default: None).

  • pin_memory (bool) – Set to True to register the underlying shared memory for cuda, which will achieve better performance if you want to copy loaded data from channel to cuda device. (default: False).

Please ref to _BasicDistSamplingWorkerOptions for more detailed comments of related input arguments.

class RemoteDistSamplingWorkerOptions(server_rank: int | None = None, num_workers: int = 1, worker_devices: List[device] | None = None, worker_concurrency: int = 4, master_addr: str | None = None, master_port: int | str | None = None, num_rpc_threads: int | None = None, rpc_timeout: float = 180, buffer_size: int | str | None = None, prefetch_size: int = 4)[source]

Bases: _BasicDistSamplingWorkerOptions

Options for launching distributed sampling workers on remote servers.

Note that if RemoteDistSamplingWorkerOptions is used, all sampling workers will be launched on remote servers. Thus, a cross-machine based channel will be created for message passing of sampled results, which are produced by those remote sampling workers and consumed by the current process.

Parameters:
  • server_rank (int) – The rank of server to launch sampling workers. If set to None, it will be automatically assigned. (default: None).

  • num_workers (int) – How many workers to launch on the remote server for distributed neighbor sampling of the current process. (default: 1).

  • worker_devices (torch.device or List[torch.device], optional) – List of devices assgined to workers of this group. (default: None).

  • worker_concurrency (int) – The max sampling concurrency for each sampling worker. (default: 4).

  • master_addr (str, optional) – Master address for rpc initialization across all sampling workers. (default: None).

  • master_port (str or int, optional) – Master port for rpc initialization across all sampling workers. (default: None).

  • num_rpc_threads (int, optional) – Number of threads used for rpc agent on each sampling worker. (default: None).

  • rpc_timeout (float) – The timeout in seconds for rpc requests. (default: 180).

  • buffer_size (int or str) – The size (bytes) allocated for the server-side buffer. The number of num_workers * 64MB will be used if set to None. (default: None).

  • prefetch_size (int) – The max prefetched sampled messages for consuming on the client side. (default: 4).

dist_sampling_producer

MP_STATUS_CHECK_INTERVAL = 5.0

Interval (in seconds) to check status of processes to avoid hanging in multiprocessing sampling.

class MpCommand(value)[source]

Bases: Enum

Enum class for multiprocessing sampling command

SAMPLE_ALL = 0
STOP = 1
class DistMpSamplingProducer(data: DistDataset, sampler_input: NodeSamplerInput | EdgeSamplerInput, sampling_config: SamplingConfig, worker_options: _BasicDistSamplingWorkerOptions, output_channel: ChannelBase)[source]

Bases: object

A subprocess group of distributed sampling workers.

Note that this producer is only used for workload with separate sampling and training, all sampled results will be sent to the output channel.

init()[source]

Create the subprocess pool. Init samplers and rpc server.

shutdown()[source]

Shutdown sampler event loop and rpc server. Join the subprocesses.

produce_all()[source]

Perform sampling for all input seeds.

class DistCollocatedSamplingProducer(data: DistDataset, sampler_input: NodeSamplerInput | EdgeSamplerInput, sampling_config: SamplingConfig, worker_options: _BasicDistSamplingWorkerOptions, device: device)[source]

Bases: object

A sampling producer with a collocated distributed sampler.

Note that the sampled results will be returned directly and this producer will be blocking when processing each batch.

init()[source]
shutdown()[source]
reset()[source]
sample()[source]

dist_server

SERVER_EXIT_STATUS_CHECK_INTERVAL = 5.0

Interval (in seconds) to check exit status of server.

class DistServer(dataset: DistDataset)[source]

Bases: object

A server that supports launching remote sampling workers for training clients.

Note that this server is enabled only when the distribution mode is a server-client framework, and the graph and feature store will be partitioned and managed by all server nodes.

Parameters:

dataset (DistDataset) – The DistDataset object of a partition of graph data and feature data, along with distributed patition books.

shutdown()[source]
wait_for_exit()[source]

Block until the exit flag been set to True.

exit()[source]

Set the exit flag to True.

get_dataset_meta()[source]

Get the meta info of the distributed dataset managed by the current server, including partition info and graph types.

create_sampling_producer(sampler_input: NodeSamplerInput | EdgeSamplerInput, sampling_config: SamplingConfig, worker_options: RemoteDistSamplingWorkerOptions) int[source]

Create and initialize an instance of DistSamplingProducer with a group of subprocesses for distributed sampling.

Parameters:
Returns:

A unique id of created sampling producer on this server.

destroy_sampling_producer(producer_id: int)[source]

Shutdown and destroy a sampling producer managed by this server with its producer id.

start_new_epoch_sampling(producer_id: int)[source]

Start a new epoch sampling tasks for a specific sampling producer with its producer id.

fetch_one_sampled_message(producer_id: int)[source]

Fetch a sampled message from the buffer of a specific sampling producer with its producer id.

get_server() DistServer[source]

Get the DistServer instance on the current process.

init_server(num_servers: int, num_clients: int, server_rank: int, dataset: DistDataset, master_addr: str, master_port: int, num_rpc_threads: int = 16, request_timeout: int = 180, server_group_name: str | None = None)[source]

Initialize the current process as a server and establish connections with all other servers and clients. Note that this method should be called only in the server-client distribution mode.

Parameters:
  • num_servers (int) – Number of processes participating in the server group.

  • num_clients (int) – Number of processes participating in the client group.

  • server_rank (int) – Rank of the current process withing the server group (it should be a number between 0 and num_servers-1).

  • dataset (DistDataset) – The DistDataset object of a partition of graph data and feature data, along with distributed patition book info.

  • master_addr (str) – The master TCP address for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.

  • master_port (int) – The master TCP port for RPC connection between all servers and clients, the value of this parameter should be same for all servers and clients.

  • num_rpc_threads (int) – The number of RPC worker threads used for the current server to respond remote requests. (Default: 16).

  • request_timeout (int) – The max timeout seconds for remote requests, otherwise an exception will be raised. (Default: 16).

  • server_group_name (str) – A unique name of the server group that current process belongs to. If set to None, a default name will be used. (Default: None).

wait_and_shutdown_server()[source]

Block until all client have been shutdowned, and further shutdown the server on the current process and destroy all RPC connections.

event_loop

wrap_torch_future(f: Future) Future[source]

Convert a torch future to a standard asyncio future.

class ConcurrentEventLoop(concurrency)[source]

Bases: object

Concurrent event loop context.

Parameters:

concurrency – max processing concurrency.

start_loop()[source]
shutdown_loop()[source]
wait_all()[source]

Wait all pending tasks to be finished.

add_task(coro, callback=None)[source]

Add an asynchronized coroutine task to run.

Parameters:
  • coro – the async coroutine func.

  • callback – the callback func applied on the returned results after the coroutine task is finished.

Note that any results returned by callback func will be ignored, so it is preferable to handle all in your callback func and do not return any results.

run_task(coro)[source]

Run a coroutine task synchronously.

rpc

rpc_is_initialized()[source]

Check whether rpc has been initialized on the current process.

get_rpc_master_addr()[source]

Get the master address for rpc communication on the current process.

get_rpc_master_port()[source]

Get the master port for rpc communication on the current process.

get_rpc_current_group_worker_names() List[str][source]

Get the rpc worker names (sorted by rank) of the current group.

get_rpc_worker_names() Dict[DistRole, List[str]][source]

Get the rpc worker names (each sorted by rank) of each current group.

all_gather(obj, timeout=None)[source]

Gathers objects only from the current role group in a list. This function blocks until all workers in the current role group have received the gathered results. The implementation of this methid is refer to torch.distributed.rpc.api._all_gather.

barrier(timeout=None)[source]

Block until all local and remote RPC processes in the current role group reach this method.

global_all_gather(obj, timeout=None)[source]

Gathers objects from all role groups in a list, using the implementation of torch.distributed.rpc.api._all_gather.

global_barrier(timeout=None)[source]

Block until all local and remote RPC processes across all role groups reach this method.

init_rpc(master_addr: str, master_port: int, num_rpc_threads: int = 16, rpc_timeout: float = 180)[source]

Initialize rpc on the current process.

shutdown_rpc(graceful=True)[source]

Shutdown rpc agent on the current process.

If graceful set to False, other mechanisms should ensure that all rpc requests are completed before shutting down rpc servers.

class RpcDataPartitionRouter(partition2workers: List[List[str]])[source]

Bases: object

A router that can select a remote rpc worker with a certain data partition to perform a rpc request.

get_to_worker(data_partition_idx: int) str[source]
rpc_sync_data_partitions(num_data_partitions: int, current_partition_idx: int)[source]

Synchronize the data partition info across all workers only in the current role group.

Note that all data should be partitioned and used with a single role group.

Parameters:
  • num_data_partitions (int) – The number of all data partitions.

  • current_partition_idx (int) – The data partition idx that the current process is responsible for, some compution tasks on this data partition may be send to the current process from remote workers.

class RpcCalleeBase[source]

Bases: ABC

A wrapper base for rpc callee that will perform rpc requests from remote processes.

Note that the callee will be called only from rpc workers in the current role group.

abstract call(*args, **kwargs)[source]

The real processing entry for rpc requests, need to be overwrite.

rpc_register(callee: RpcCalleeBase)[source]

Register a callee for rpc requests only in the current role group, this method will block until all local and remote RPC processes of the current role group reach this method.

rpc_request_async(worker_name, callee_id, args=None, kwargs=None)[source]

Perform a rpc request asynchronously within the current role group. and return a future.

rpc_request(worker_name, callee_id, args=None, kwargs=None)[source]

Perform a rpc request synchronously within the current role group and return the results.

rpc_global_request_async(target_role: DistRole, role_rank: int, func, args=None, kwargs=None)[source]

Perform a rpc request asynchronously to other rpc worker on arbitrary role group and return a future.

rpc_global_request(target_role: DistRole, role_rank: int, func, args=None, kwargs=None)[source]

Perform a rpc request synchronously to other rpc worker on arbitrary role group and return the results.