How to scale training on multiple GPUs

How to train a PyTorch model in multiple GPUs

Giuliano Giacaglia
9 min readDec 22, 2019

--

One of the biggest problems with Deep Learning models is that they are becoming too big to train in a single GPU. If the current models were trained in a single GPU, they would take too long. In order to train models in a timely fashion, it is necessary to train them with multiple GPUs.

We need to scale training methods to use 100s of GPUs or even 1000s of GPUs. For example, a famous researcher was able to reduce the ImageNet training time from 2 weeks to 18 minutes, or train the largest and the state of the art Transformer-XL in 2 weeks instead of 4 years. He used 100s of GPUs to do that.

We care deeply about our training iteration speeds. So in order to increase our iteration speeds, we’ve had to scale up our training to multiple GPUs. In this blog post, I will go over how to scale up training with PyTorch. We’ve had some models in TensorFlow (<2.0) and scaled our training, using Horovod, a tool created by Uber Engineering team. If you go down that path, we recommend installing it using their Docker Image.

We find that PyTorch has the best balance between ease of use and control, without giving up performance. PyTorch built two ways to implement distribute training in multiple GPUs: nn.DataParalllel and nn.DistributedParalllel. They are simple ways of wrapping and changing your code and adding the capability of training the network in multiple GPUs.

nn.DataParallel is easier to use, but it requires its usage in only one machine. nn.DataParalllel only uses one process to compute model weights and distribute them to each GPU during each batch.

In this blog post, I will go into detail how nn.DataParallel and nn.DistributedDataParalllel work. I will cover the main differences between the two, and how training in multiple GPUs works. I will first explain how the training a neural network works.

Training loop

First, let’s go over how training a neural network usually works. For this we will use some images created by HuggingFace:

There are four main steps for each loop that happens when training a neural network:

  1. The forward pass, where the input is processed by the neural network
  2. The loss function is calculated, comparing the predicted label with the ground-truth label
  3. The backward pass is done, calculating the gradients for each parameter based on the loss (using back-propagation)
  4. The parameters are updated using the gradients

For batch sizes greater than one, we might want to batch normalize the training. For an in-depth explanation of batch normalization I recommend going over this blog post:

DataParallel

DataParallel helps distribute training into multiple GPUs in a single machine. Let’s go into detail how DataParallel works. There are a few steps that happen whenever training a neural network using DataParallel:

Image created by HuggingFace
  1. The mini-batch is split on GPU:0
  2. Split and move min-batch to all different GPUs
  3. Copy model out to GPUs
  4. Forward pass occurs in all different GPUs
  5. Compute loss with regards to the network outputs on GPU:0, and return losses to the different GPUs. Calculate gradients on each GPU
  6. Sum up gradients on GPU:0 and use the optimizer to update model on GPU:0

A Simple example

Let’s code this up. First, let’s import everything we need

We define a very simple convolutional model for predicting MNIST

Line 4–14: We are defining the layers in this neural network.

Line 16–21: We define the forward pass

The main() function will take in some arguments and run the training function:

Line 2–6: We instantiate the model and set it to run in the specified GPU, and run our operations in multiple GPUs in parallel by using DataParallel.

Line 9–23: We define the loss function (criterion), and the optimizer (in this case we are using SGD). We define the training data set (MNIST) and the loader of the data.

Line 24–45: That’s where the loop for training the neural network happens. We load the inputs and the expected outputs. We run the forward pass and the backward pass and the optimizer.

There’s definitely some extra stuff in here (the number of GPUs and nodes, for example) that we don’t need yet, but it’s helpful to put the whole skeleton in place.

DistributedDataParallel

For ‘nn.DistributedDataParallel’, the machine has one process per GPU, and each model is controlled by each process. The GPUs can all be on the same node or across multiple nodes. Only gradients are passed between the processes/GPUs.

During training, each process loads its own mini-batch from disk and passes it to its GPU. Each GPU does its forward pass, then the gradients are all-reduced across the GPUs. Gradients for each layer do not depend on previous layers, so the gradient all-reduce is calculated concurrently with the backwards pass to further alleviate the networking bottleneck. At the end of the backwards pass, every node has the averaged gradients, ensuring that the model weights stay synchronized.

Tutorial

To do this with multiprocessing, we need a script that will launch a process for every GPU. Each process needs to know which GPU to use, and where it ranks amongst all the processes that are running. We’ll need to run the script on each node.

Let’s take a look at the changes to each function. I’ve fenced off the new code to make it easy to find.

Let’s go over the arguments of the main function:

  • args.nodes is the total number of nodes we are using (number of machines).
  • args.gpus is the number of GPUs on each node (on each machine).
  • args.nr is the rank of the current node (machine) within all the nodes (machines), and goes from 0 to args.nodes - 1.

Let’s go through the new changes line by line:

Line 12: Based on the number of nodes and GPUs per node, we can calculate the world_size, or the total number of processes to run, which is equal to the total number of GPUs times the number of nodes.

Line 13: This tells the multiprocessing module what IP address to look at for process 0. It needs this so that all the processes can sync up initially. This needs to be the same across all nodes.

Line 14: Likewise, this is the port to use when looking for process 0.

Line 15: Now, instead of running the train function once, we will spawn args.gpus processes, each of which runs train(i, args), where i goes from 0 to args.gpus - 1. Remember, we run the main() function on each node, so that in total there will be args.nodes * args.gpus = args.world_size processes.

Instead of lines 13 and 14, I could have run export MASTER_ADDR=10.57.23.164 and export MASTER_PORT=8888 in the terminal.

Next, let’s look at the modifications to train. I’ll fence the new lines again.

I’ve removed some of thee code and replaced it with ..., to make this tutorial easier to read, but if you want the full script, it is here .

Line 3: This is the global rank of the process within all of the processes. We’ll use this for line 6.

Lines 4–6: Initialize the process and join up with the other processes. This is “blocking,” meaning that no process will continue until all processes have joined. I’m using the NCCL, since its the fastest available.. The init_method tells the process group where to look for some settings. In this case, it’s looking at environment variables for the MASTER_ADDR and MASTER_PORT, which we set within main. That’s why we set it to env://. We could have set the world_size there as well as WORLD_SIZE.

Line 23: Wrap the model as a DistributedDataParallel model. This reproduces the model onto each GPU.

Lines 35–39: The nn.utils.data.DistributedSampler makes sure that each process gets a different slice of the training data, whenever loading the data. If you want to debug and verify that each GPU is loading the right data, you can calculate the SHAs of the tensors loaded into each GPU.

Lines 46 and 51: Use the nn.utils.data.DistributedSampler instead of shuffling the usual way. That’s why we set shuffle to false.

To run this on, say, 4 nodes with 8 GPUs each, we need 4 terminals (one on each node). On node 0 (as set by line 13 in main):

Then, on the other nodes:

for i∈1,2,3. In other words, we run this script on each node, telling it to launch args.gpus processes that sync with each other before training begins.

Note that the effective batch_size is now the per/GPU batch_size (the value in the script) * the total number of GPUs (the world size).

Problems

There are a few problems that might occur whenever running the same model in a few GPUs instead of one GPU. The biggest problem that can occur is that the main GPU may run out of memory. The reason for that is because the first GPU will save all the different outputs for the different GPUs to calculate the loss.

The following message will be displayed on the console whenever you are training the network: ran out of memory trying to allocate 2.59GiB

In order to solve this problem, and reduce the amount of memory usage, we use two techniques:

  1. Reduce the batch_size
  2. Use Apex for mixed precision

The first technique is pretty straightforward, and usually involves just changing one hyper-parameter.

The second technique means that we are going to decrease the precision of the weights that are used in the neural network, and therefore use less memory. Mixed-precision means you use 16-bit for certain things but keep things like weights at 32-bit. To learn more about mixed precision, I recommend reading this blog post:

Apex for mixed precision

In order to solve the problem of running out of memory, we recommend using lower precision numbers. That allows us to use larger batch sizes and take advantage of NVIDIA Tensor Cores for faster computation.

In order to make APEX work, we need to change 2 parts of the code. The first is inside the train loop inside the codebase:

Training step

Line 18: amp.initialize wraps the model and optimizer for mixed precision training. Note that that the model must already be on the correct GPU before calling amp.initialize. The opt_level goes from O0, which uses all floats, through O3, which uses half-precision throughout. O1 and O2 are different degrees of mixed-precision, the details of which can be found in the Apex documentation.

Line 20: apex.parallel.DistributedDataParallel is a drop-in replacement for nn.DistributedDataParallel. We no longer have to specify the GPUs because Apex only allows one GPU per process. It also assumes that the script calls torch.cuda.set_device(local_rank)(line 10) before moving the model to GPU.

Lines 37–38: Mixed-precision training requires that the loss is scaled in order to prevent the gradients from underflowing. Apex does this automatically.

Make sure that whenever you initialize AMP, you set opt_level=O1, due to a bug with its implementation

Checkpoint

We need to change the way we save and load models whenever using Apex, See the following issue. And we need to change the way we save checkpoints and load them to our models:

Line 5: We add the amp.state_dict to the checkpoint

Line 19: We load the state_dict to amp here.

Conclusion

With all of that, you should be able to start training your model in multiple GPUs. We recommend start training a small model in one GPU before trying to scale training to multiple GPUs. But this tutorial may help if there is a need to scale training.

Links and references:

https://lambdalabs.com/blog/introduction-multi-gpu-multi-node-distributed-training-nccl-2-0/

https://medium.com/intel-student-ambassadors/distributed-training-of-deep-learning-models-with-pytorch-1123fa538848

https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da

https://medium.com/huggingface/training-larger-batches-practical-tips-on-1-gpu-multi-gpu-distributed-setups-ec88c3e51255

https://medium.com/south-park-commons/scaling-transformer-xl-to-128-gpus-85849508ec35

https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html

--

--