Distributed Training
GLT supports efficient distributed training across multiple GPUs or machines. Considering that GNN training is essentially an asynchronous producer (sampling) consumer (training) model, the optimization objective of the training process is the overall training throughput. Correspondingly, the architecture of GLT can be divided into sampling (sampler) and training (trainer) modules in general. GLT supports two types of deployments in the distributed setting. In the Worker Mode, the sampler and trainer are deployed on the same physical node (each node has sampler & trainer at the same time). In the Server-Client Mode, the sampler and trainer are separately deployed on different sets of physical nodes in the cluster. Refer Distributed Training Tutorial for a more detailed introduction of distributed training GLT. Next, we use a simple example to demonstrate distributed training using GLT in the worker mode.
Dataset Preprocessing & Partitioning
We use the OGBN-Products dataset in this example. In distributed training (under the worker mode), each node in the cluster holds a partition of the graph. Thus before the training starts, we partition the OGBN-Products dataset into multiple partitions, each of which corresponds to a specific training worker. GLT provides a preprocessing script for partitioning ogbn datasets.
Run the following scripts to partition OGBN-Products into 2 partitions.
cd examples/distributed
python partition_ogbn_dataset.py --dataset=ogbn-products --num_partitions=2
GLT supports caching graph topology and frequently accessed features
in GPU to accelerate GPU sampling and feature collection. For feature cache, we adopt a
pre-sampling-based approach to determine the hotness of vertices, and cache features for
vertices with higher hotness while loading the graph. The uncached feature data are stored in
pinned memory for efficient access via UVA. The above script also calculates
the hotness distribution of vertices and selects target vertices for feature cache. The cache capacity
is decided by a user-specified cache ratio. To partition your own dataset for distributed training,
please reference the Preprocessing and Data Partitioning section in this tutorial.
Graph Loading
Once the graph partitioning is done, we can start loading the graph data from the partitioned files for each node.
print('--- Loading data partition ...')
root_dir = osp.join(osp.dirname(osp.realpath(__file__)), args.ogbn_products_root_dir)
data_pidx = args.node_rank % args.num_dataset_partitions
dataset = glt.distributed.DistDataset()
dataset.load(
graph_dir=osp.join(root_dir, 'ogbn-products-partitions'),
partition_idx=data_pidx,
graph_mode='ZERO_COPY',
whole_node_label_file=osp.join(root_dir, 'ogbn-products-label', 'label.pt')
)
train_idx = torch.load(
osp.join(root_dir, 'ogbn-products-train-partitions', f'partition{data_pidx}.pt')
)
train_idx.share_memory_()
The graphlearn_torch.distributed.DistDataset
manages the in-memory storage of partitioned graph topology and feature. The graph is loaded
into CPU/GPU memory by calling the load function, where partition_idx is the
index of the partition to be loaded, graph_mode determines where the topology is stored
(e.g., CPU memory or GPU memory) and the sampling devices (e.g., CPU or GPU). Note that, when the
graph topology is stored in CPU memory (not pinned), the sampling can only be performed by CPU. GPU sampling
must be applied when the graph is stored in the pinned CPU memory or GPU memory. The ZERO_COPY mode means
the graph topology is stored in pinned memory and can be accessed via UVA. We also load the train_idx for this partition. As multiple processes are involved in sampling and training, we move
train_idx to shared memory.
Training Process
The pipeline of model training is wrapped as function run_training_proc, which is executed by
the training processes spawned in each node. This function consists of the following steps:
1. Initialize the distributed worker group context for GLT.
glt.distributed.init_worker_group(
world_size=num_nodes*num_training_procs_per_node,
rank=node_rank*num_training_procs_per_node+local_proc_rank,
group_name='distributed-sage-supervised-trainer'
)
current_ctx = glt.distributed.get_context()
current_device = torch.device(local_proc_rank % torch.cuda.device_count())
By calling init_worker_group, we pass the world size, the id of the current rank and
the name of the worker group to each training processing.
A graphlearn_torch.distributed.DistContext
object is created during the initialization of worker group, which stores the meta data of
the training process and its belonging worker group.
2. Initialize the training process group of PyTorch.
torch.distributed.init_process_group(
backend='nccl',
rank=current_ctx.rank,
world_size=current_ctx.world_size,
init_method='tcp://{}:{}'.format(master_addr, training_pg_master_port)
)
3. Create distributed neighbor loader for training
train_idx = train_idx.split(train_idx.size(0) // num_training_procs_per_node)[local_proc_rank]
train_loader = glt.distributed.DistNeighborLoader(
data=dataset,
num_neighbors=[15, 10, 5],
input_nodes=train_idx,
batch_size=batch_size,
shuffle=True,
collect_features=True,
to_device=current_device,
worker_options=glt.distributed.MpDistSamplingWorkerOptions(
num_workers=2,
worker_devices=[torch.device('cuda', i % torch.cuda.device_count() for i in range(2))]
worker_concurrency=4,
master_addr=master_addr,
master_port=train_loader_master_port,
channel_size='1GB',
pin_memory=True
)
)
Each training process has a graphlearn_torch.distributed.DistNeighborLoader,
which handles distributed graph sampling and feature collection for this process.
The worker_options stores a set of sampling-related options, including the number of
sampling processes created for each training process, the devices where the samplings are
performed, the number of batches each sampling process can process concurrently,
RPC related options, and options required for creating the message channel between sampling and training processes.
We abstract two types of worker options to accommodate different scenarios of deployment:
graphlearn_torch.distributed.MpDistSamplingWorkerOptionsspecifies how distributed samplers are created in the worker mode.graphlearn_torch.distributed.RemoteDistSamplingWorkerOptionsspecifies how distributed samplers are created in the server-client mode.
4. Define model and optimizer.
torch.cuda.set_device(current_device)
model = GraphSAGE(
in_channels=100,
hidden_channels=256,
num_layers=3,
out_channels=47,
).to(current_device)
model = DistributedDataParallel(model, device_ids=[current_device.index])
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
We use GraphSAGE in this example, and adopt PyTorch’s Distributed Data Parallel (DDP) for distributed training.
5. Train the model
for epoch in range(0, epochs):
model.train()
start = time.time()
for batch in train_loader:
optimizer.zero_grad()
out = model(batch.x, batch.edge_index)[:batch.batch_size].log_softmax(dim=-1)
loss = F.nll_loss(out, batch.y[:batch.batch_size])
loss.backward()
optimizer.step()
end = time.time()
torch.cuda.empty_cache() # empty cache for efficient GPU memory.
torch.distributed.barrier()
print(f'-- [Trainer {current_ctx.rank}] Epoch: {epoch:03d}, Loss: {loss:.4f}, Epoch Time: {end - start}')
Run the example code
Run the example by executing the following scripts:
# node 0:
CUDA_VISIBLE_DEVICES=0,1 python dist_train_sage_supervised.py \
--num_nodes=2 --node_rank=0 --master_addr=localhost
# node 1:
CUDA_VISIBLE_DEVICES=2,3 python dist_train_sage_supervised.py \
--num_nodes=2 --node_rank=1 --master_addr=localhost
The complete code of the above example can be found in examples/distributed/dist_train_sage_supervised.py.
An example of distributed training in server-client mode can be found in examples/distributed/server_client_mode.