Distributed Neural Network Training In Pytorch

Nilesh Vijayrania
Towards Data Science
6 min readDec 25, 2020

--

With several advancements in Deep Learning, complex networks such as giant transformer networks, wider and deeper Resnets, etc. have evolved which keeps a larger memory footprint. More often than not, while training these networks, deep learning practitioners need to use multiple GPUs to train them efficiently. In this post, I am going to walk you through, how distributed neural network training could be set up over a GPU cluster using PyTorch.

Photo by Nana Dua on Unsplash

Usually, distributed training comes into the picture in two use-cases.

  1. Model Splitting across GPUs: When the model is so large that it cannot fit into a single GPU’s memory, you need to split parts of the model across different GPUs.
  2. Batch Splitting across GPUs.When the mini-batch is so large that it cannot fit into a single GPU’s memory, you need to split the mini-batch across different GPUs.

Model Splitting across GPUs:

Splitting the model across GPUs is straightforward and doesn’t require much code change. While setting up the network itself, parts of the model could be moved to specific GPUs. Afterwards while forward propagating the data through the network, the data needs to be moved to the corresponding GPU as well. Below is the PyTorch snippet doing the same.

Batch Splitting across GPUs:

There are 3 ways to split batch across GPUs.

  1. Accumulating Gradients
  2. Using nn.DataParallel
  3. Using nn.DistributedDataParallel

Accumulating Gradients

The easiest way to split batches across GPUs is to accumulate gradients. Suppose, the batch size that we want to train with is 256, but only 32 batch size could fit into one GPU memory. We could perform 8(=256/32) gradient descend iterations without performing the optimization step and keep on adding the calculated gradients via loss.backward() step. Once we accumulate gradients of 256 data points, we perform the optimization step i.e. calling optimizer.step(). Below is the PyTorch snippet for implementing accumulating gradients.

Pros:

  • Doesn’t require multiple GPUs for large batch training. This method enables large batch training even with a single GPU.

Cons:

  • Takes significantly more time than training parallelly on multiple GPUs.

Using nn.DataParallel

If you have access to multiple GPUs, it would make sense to hand different batch splits to different GPUs, do gradient calculation on different GPUs, and then accumulate the gradients to perform gradient descend.

forward and backward pass for multi-gpu training(Image Credits: Hugging Face)

Basically, the given input is split across the GPUs by chunking in the batch dimension. In the forward pass, the model is replicated on each device, and each replica handles a portion of the batch. During the backward pass, gradients from each replica are summed to produce the resultant gradient and applied on the master gpu(GPU-1 in the image above) to update the model weights. In the next iteration, this updated model on master GPU, is again replicated on each GPU device.

In PyTorch, it takes one line to enable distributed training using nn.DataParallel. The model just needs to be wrapped in nn.DataParallel.

model = torch.nn.DataParallel(model)
...
...
loss = ...
loss.backward()

Pros:

  • Parallelizes NN training over multiple GPUs and hence it reduces the training time in comparison with accumulating gradients.
  • Good for quick prototyping because the code change is minimal.

Cons:

  • nn.DataParallel uses a single process multi-threading method to train the same model on different GPUs. It keeps the main process on one GPU and runs a different thread on other GPUs. Since multi-threading in python suffers from GIL(Global Interpreter Lock) issues, this restricts fully parallelized distributed training setup.

Using DistributedDataParallel

Unlike nn.DataParallel, DistributedDataParallel uses multi-processing to spawn separate processes on separate GPUs and leverage the full parallelism across GPUs. But setting up DistributedDataParallel pipeline is more complex than nn.DataParallel and requires the following steps(but not necessarily in this order).

  1. Wrap the model in torch.nn.Parallel.DistributedDataParallel .
  2. Setup the Dataloader to use distributedSampler to distribute samples efficiently across all GPUs. Pytorch provides torch.utils.data.Distributed.DistributedSampler for it.
  3. Setup the distributed backend to manage the synchronization of GPUs. torch.distributed.init_process_group(backend='nccl').
    There are different backends(nccl, gloo, mpi, tcp) provided by pytorch for distributed training. As a rule of thumb, use nccl for distributed training over GPUs and gloo for distributed training over CPUs. Read more about them here https://pytorch.org/tutorials/intermediate/dist_tuto.html#advanced-topics
  4. Launch the separate processes on each GPU. use torch.distributed.launch utility function for the same. Suppose we have 4 GPUs on the cluster node over which we would like to use for setting up distributed training. Following shell command could be used to do that.
python -m torch.distributed.launch --nproc_per_node=4 
--nnodes=1 --node_rank=0
--master_port=1234 train.py <OTHER TRAINING ARGS>

While setting up the launch script, we have to provide a free port(1234 in this case) over the node where the master process would be running and used to communicate with other GPUs.

Below is the complete PyTorch gist covering all the steps.

Notice that the above-mentioned utility call(last commented line in the above gist) is for a single node over a GPU cluster. In case you want to use a multi-node setup, additionally, you have to choose a node as the master node and provide the master_addr argument while setting up the launch utility as follows. Suppose we have 2 nodes with 4 GPU each and the first node with the ip address “192.168.1.1” is the master node. We have to start the launch script on each node separately as follows.

On first node, run

python -m torch.distributed.launch --nproc_per_node=4 
--nnodes=1 --node_rank=0
--master_addr="192.168.1.1" --master_port=1234 train.py <OTHER TRAINING ARGS>

On second node, run

python -m torch.distributed.launch --nproc_per_node=4 
--nnodes=1 --node_rank=1
--master_addr="192.168.1.1" --master_port=1234 train.py <OTHER TRAINING ARGS>

Other Utility Functions:

While evaluating the model or generating the logs, it is required to collect current batch statistics such as losses, accuracy, etc. from all GPUs and collate them at one machine to log. Following methods are provided by PyTorch for syncing variables across all the GPUs.

  • torch.distributed.gather(input_tensor, gather_list, dst) : Collect the specified input_tensor from all devices and place them on the dst device in gather_list.
  • torch.distributed.all_gather(tensor_list, input_tensor) : Collect the specified input_tensor from all devices and place them in tensor_list variable on all devices.
  • torch.distributed.reduce(input_tensor, dst, reduce_op=ReduceOp.SUM): Collect the input_tensor from all devices and reduce them using the specified reduce operation such as sum, mean, etc. The final result is placed on the dst device.
  • torch.distributed.all_reduce(input_tensor, reduce_op=ReduceOp.SUM) : Same as reduce operation but the final result is copied to all devices.

For more details about parameters and methods, read the torch.distributed package. https://pytorch.org/docs/stable/distributed.html

As an example, the below code takes the loss values from all GPUs and reduces them to the master device(cuda:0).

#In continuation with distributedDataParallel.py abovedef get_reduced_loss(loss, dest_device):
loss_tensor = loss.clone()
torch.distributed.reduce(loss_tensor, dst=dest_device)
return loss_tensor
if args.local_rank==0:
loss_tensor = get_reduced_loss(loss.detach(), 0)
print(f'Current batch Loss = {loss_tensor.item()}'

Pros:

  • The same code setup could be used for a single GPU without any code change. A single GPU setting would require only the launch script with the appropriate setting.

cons:

  • Layers such as BatchNorm which uses whole batch statistics in their computations, can’t carry out the operation independently on each GPU using only a split of the batch. PyTorch provides SyncBatchNorm as a replacement/wrapper module for BatchNorm which calculates the batch statistics using the whole batch divided across GPUs. See the sample code below for usage of SyncBatchNorm .
network = .... #some network with BatchNorm layers in itsync_bn_network = nn.SyncBatchNorm.convert_sync_batchnorm(network)
ddp_network = nn.parallel.DistributedDataParallel(
sync_bn_network,
device_ids=[args.local_rank], output_device=args.local_rank)

Summary

  • For splitting the model across GPUs, split the model into sub_modules and push each sub_module to separate GPU.
  • For splitting the batch across GPUs, use either of accumulating gradients, nn.DataParallel or nn.DistributedDataParallel.
  • For quick prototyping, nn.DataParallel could be preferred.
  • For training a large model and to leverage full parallel training across multiple GPUs, nn.DistributedDataParallel should be used.
  • While using nn.DistributedDataParallel, replace, or wrap nn.BatchNorm layer with nn.SyncBatchNorm.

Below is the list of references used for writing this post.

  1. https://medium.com/huggingface/training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups-ec88c3e51255
  2. https://pytorch.org/docs/stable/distributed.html

--

--