How to Reduce the Training Time of Your Neural Network from Hours to Minutes

Part 2 of the articles on AI with HPC: parallelising a CNN with Horovod and GPUs to obtain a 75x-150x speed-up.

Bhaskar Agarwal
Towards Data Science

--

Photo by Robert Katzki on Unsplash

In part 1 of the series we looked at how it is possible to get a ~1500x speed-up in IO operations with a few lines of Python using the multiprocessing module. In this article, we will look at parallelising a deep learning code and reducing the training time from roughly 13 hours to 13 minutes!

The Context

As a data scientist, you will eventually face the following problem (if you haven’t faced it already).

“I have a neural network to train but the input data doesn’t fit in memory!” or “My neural network takes forever to train with this amount of data!”

It would surely be a pity to exclude a substantial part of your data for training, or wait for hours (even days) for your neural network to finish training. There has to be a better way and there is, it is called data parallelism.

The Problem

For this post, we will use a modified version of the Pepsico-Lab potato chips control dataset available on Kaggle. The original images are in a 2000 x 2000 pixel format with 3 channels. I modify the images by scaling them to 500 x 500 and applying 5 transformations (a vertical flip, a horizontal flip and three 90 degree rotations) to create 6 times more images than in the original dataset.

The two types of chip to distinguish in this post.

I then wrote a simple convolutional neural network (CNN) code for classification based on the input data I created from the original dataset. All the material can be found here.

The CNN used in this post visualised using visualkeras. Note that the first layer is not to-scale for the sake of aesthetics, but it gives an idea of the tiny CNN I created for this problem.

The resources and the repository

For this article we will be using:

  • TensorFlow2 and Keras
  • Horovod
  • Python 3.8+
  • A computer with at least 1 GPU
  • All the codes and data can be found in this repository.

Data Parallelism

In data parallelism, one deploys the same model to be trained on different processing units (think GPUs) but with a unique subset of the input data. After every epoch, the model parameters are collected, aggregated and communicated back to the processing units to continue the training of their respective model. This way, each processing unit trains the same model with its own unique subset of the data (thus a fraction of it), but the model parameters are updated with an insight to the entire input dataset.

How can I use this approach with neural networks?

If you use TensorFlow, PyTorch or any of the popular public deep learning frameworks, you can use Horovod for distributed training. Horovod was developed by Uber in order to bring the ideas of HPC to deep learning. Installation is fairly straight-forward and I won’t go into details as their GitHub page is excellent in explaining the same. For the remainder of this post, we will talk about how one can data parallelise their TensorFlow code using Horovod.

Think of Horovod as a plug-in to your existing code. The steps involved in Horovod-ising your code follow the idea of data-parallelism, so before anything else you must fully understand how your code will be executed with Horovod. You muse then decide on a way to distribute your data. Once this is done, you can start implementing Horovod in your code.

To me, it is easier to approach this subject in the following steps.

  1. Understand how your code will be executed with Horovod, i.e. horovodrun
  2. Make sure you have a good data distribution strategy
  3. Intialise Horovod in your code
  4. Make sure your optimizer is distributed
  5. Broadcast your variables from the coordinator and fix your callbacks

We will now discuss each of these points in some detail.

1. How Horovod operates

Horovodrun wraps some essential message passing interface (MPI) commands and concepts into a higher level API. Thus, under the hood one can find a lot of similarities between the two if they are familiar with MPI. On a system with n GPUs one would execute a CNN code, where Horovod has been implemented, as

horovodrun -np n python cnn_parallel.py

Codes that have been modified with Horovod need to be executed with either horovodrun or mpirun. For the data parallelism framework, with n GPUs one would create n tasks. Each task is running your modified CNN code with python but on a unique subset of the data. In the image below I explain the same. We are working on a machine with 2 GPUs. The input dataset D is split into two subsets a and b such that there is no element in common between them.

Understanding how Horovod executes your code in parallel.

The important thing to remember here is that your entire python code will be run n times, where n is the number of GPUs. Thus every print command, every IO operation, every plotting section etc. will be executed n times. This can be a confusing concept at first but the good part is that every on-screen message (e.g. print(…)) comes with its own GPU identifier.

Horovod comes with some useful calls (similar to the ones defined by the MPI standard) that help define the architecture of the system on which we will be running the code. The ‘hvd.size()’ command displays how many GPUs are actually available for use, the ‘hvd.local_rank()’ tells you the local rank of the GPU, and the ‘hvd.rank()’ tells you the global rank of the GPU. The rank can be thought of as an id for that GPU. The image below explains the idea better.

Understanding the size and ranks of GPUs in single node vs. multi-node setup.

2. Data distribution strategy

So why must you distribute the data? In order to make full use of the data parallelism, each GPU must see a unique subset of data. However if you pass the same dataset for training, you are training a model n times with the same samples on n GPUs, or in other words you are running your code in exactly the same way with the same data n times. This is not what we want to achieve with data parallelism.

There are many strategies one can adopt in order to distribute the data correctly. There are inbuilt data distribution methods in Pytorch, Tensorflow etc., but the actual strategy will depend on how your data is structured and the amount of it.

Let us create some data distribution strategies with a simple example. Assume that our data comprises of all integers from 1 -100 in increasing order without repetition. We want to distribute this data on 2 GPUs.

Option 1: TensorFlow Data Generator

We will employ the idea of distributing batches on GPUs randomly at the start of training. We will trick the fitting call in TensorFlow to assign 1/n batches to each of the GPUs using the ‘steps_per_epoch’ keyword. This method works well with a data generator, in our case, the image data generator. We keep the same batch size as we had when we were training without any GPUs.

Data distribution with a TensorFlow data generator.
img_gen = tf.keras.preprocessing.image.ImageDataGenerator(
validation_split=split)
train_dataset = img_gen.flow_from_directory(
directory= train_data_path,
target_size=(img_height,img_width),
color_mode='rgb',
batch_size=batch_size,
class_mode='sparse',
seed=123,
shuffle=True,
subset='training')
val_dataset=img_gen.flow_from_directory(directory=...
subset='validation')
total_train_batches = train_dataset.samples//batch_sizetotal_val_batches = val_dataset.samples//batch_size

Then when we perform the fitting, we instruct TensorFlow to only use 1/n of the total batches for training and validation. This is because the training will happen on each GPU, and by specifying the ‘steps_per_epoch’ as total_batches/n, we instruct TensorFlow to split the batches on the two GPUs such that each GPU gets a disjoint subset of batches from our data generator.

history = model.fit(
train_dataset,
epochs=epochs,
batch_size=batch_size,
steps_per_epoch=total_train_batches//hvd.size(),
validation_data=val_dataset,
validation_steps=total_val_batches//hvd.size(),
...

)

Option 2: TensorFlow data.Dataset object

We will employ the concept of ‘sharding’ which is equivalent to taking every q-th element in a dataset. The pseudo-code below should explain it better.

D = [1,2,...,99,100]# split into 2 shards
shard_1 = [1,3,5,...]
shard_2 = [2,4,6,...]
# split into 3 shards
shard_1 = [1,4,7,...]
shard_2 = [2,5,8,...]
shard_3 = [3,6,9,...]

Below I describe 2 different ways for the same using a batch size of 4. Note that these are not the only strategies one may adopt to distribute the data and you are free to devise your own.

  • method A: shard the dataset 2 times → batch it
  • method B: shuffle the dataset → shard the dataset 2 times → batch it

From a machine learning point of view, can you reason why only one of these might be the right way?

D = [1,2,...,99,100]# method Ashard_1 = [1,3,5...]
shard_2 = [2,4,6...]
shard_1_batch = [[1,3,5,7],[9,11,13,15]...] # split 1
shard_2_batch = [[2,4,6,8],[10,12,14,16]...] # split 2
# method BD_shuffle = [63,74,22,26,36,14,34,94,60,42,56,17,65,1,12,51...]shuffle_shard_1 = [63,22,36,34,60,56,65,12]
shuffle_shard_2 = [74,26,14,94,42,17,1,51]
shuffle_shard_1_batch = [[63,22,36,34],[60,56,65,12]...] # split 1
shuffle_shard_2_batch = [[74,26,14,94],[42,17,1,51]...] # split 2

With method A, all odd numbers end up in split 1 and even numbers up in split 2. Clearly, method B is the best as the resulting splits have a fair representation (and distribution) of even and odd numbers.

Now think about the input for a CNN. The input folder would ideally contain thousands (if not millions) of images that you need to train on, generally grouped into different classes (sub folders). When you create a TensorFlow dataset from a folder of images, it infers the classes from the directory structure. The snippet below should clarify the point better.

input_folder
|-train
| |-class_1

| | |-image_1.jpg
| | |-...
| | |-image_100.jpg
| |-class_2
| | |-image_1.jpg
| | |-...
| | |-image_50.jpg
| |-class_3
| | |-image_1.jpg
| | |-...
| | |-image_500.jpg

Thus, once such a folder structure has been passed to TensorFlow, it automatically maps the filenames to their respective class, ready to be processed and passed to the CNN for training. Note that by default, image_dataset_from_directory shuffles the filenames, or in other words your dataset is already shuffled if you use this to ingest the images.

Data distribution using sharding.
import tensorflow.keras.preprocessing as tkplocal_bs = .. # will be scaled with the hvd.size() 
# we will discuss this later
X_train = tkp.image_dataset_from_directory(..., batch_size = 1) train_dataset = X_train.
unbatch().
shard(hvd.size(), hvd.rank()).
batch(local_bs).
cache()

The hvd.shard() call takes the number of GPUs (how many shards to create) and the id of the GPU to create a unique shard of the data. Using hvd.size() and hvd.rank() as inputs to this call, we can easily manage the data splits that will be processed by each GPU. Till this point, train_dataset has not been created and thus does not exist in memory. It will be created when one finally trains the model with the command below.

model.fit(train_dataset,...)

Note that here we used the local_bs, instead of the original batch size. I will explain the reason shortly.

Training the model after data distribution

Once we have decided and implemented the data distribution strategy correctly, we are ready to train the CNN. At this point, with 2 GPUs, the data training strategy will be put into execution as shown below.

The data distribution strategy being executed when the CNN is trained.

Batch size & learning rate (Option 2 in data distribution strategy)

If you have used a neural network (NN), you must be familiar with the concept of learning rate and batch size. In a data distributed framework one might need to scale these correctly in order to compensate for the change in our training strategy. Since each GPU is now training the model with a unique subset of data, the number of samples the NN now effectively processes during every epoch is not the same as the batch size. Note that you might not need to perform this scaling depending on how you decide to distribute your data and how your code is written. In my experience, the Option 2 above for data distribution generally works better if one scales the the batch size and/or the learning rate accordingly. Basically in case you find strange loss and accuracy curves after implementing data parallelism, then this might be something to keep in mind.

Let us go back to Option 2 for data distribution. We have 2 GPUs and the dataset comprises of 100 integers with a batch size of 4. Each GPU gets 50 unique integers grouped into batches of size 4. Each GPU carries out training by processing its own 13 batches (last batch will have only 2 elements) and after every epoch, the model parameters are passed to Horovod for aggregation. Horovod aggregates model parameters from GPU 0 with a batch size of 4 and GPU 1 with a batch size of 4, so effectively the batch size is 8! In other words, when Horovod updates the model parameters at the end of every epoch, the model has technically ‘seen’ a batch where each batch had 8 unique samples. The image below should explain the point better.

Data distributed training of the CNN with two GPUs. At the end of each epoch, Horovod (orange) aggregates the model parameters from each GPU (teal and fuchsia) and updates the CNN model, now ready for training in the next epoch.

In the case where we do not change the batch size, i.e. keep it fixed to the same value as in the non data distributed version of the code, we must scale the learning rate linearly* with the total number of GPUs. This is because with more samples in our effective batch, one can increase the size of the step that the algorithm is allowed to take in the process of finding the minimum of the loss function. In other words, we have less noise (more samples) so we can afford a larger jump in the parameter space in order to find the minimum.

If we do however wish to keep the learning rate fixed, i.e. the same as in the non data distributed version of the code, we must scale the batch size inversely with the number of GPUs. Let us take our case of integers. With 2 GPUs, if I batch each shard of my data with a batch size of 2, then the effective batch size that Horovod sees at the end of each epoch is 4. Thus, I can maintain the same learning rate as I had before data distribution.

To summarise:

############################################ THEORY
# original batch size: batch_size
# local batch size: local_bs (batch size on each gpu)
#original learning rate: lr
#scaled learning rate: scaled_Llr
## option 1:
# bs= local_bs : therefore, the effective batch size is larger
# therefore scaled_lr = lr*n_gpus
## option 2:
# scaled_lr = lr
# therefore local_bs = batch_size/n_gpus
# now the effective batch size will be the same as before
############################################
############################################ IMPLEMENTATION
#pick one below:
# Option 1:
# local_bs = batch_size
# scaled_lr = base_lr * hvd.size()
# OR
# Option 2:
local_bs = int(batch_size/hvd.size())
scaled_lr = base_lr

*sometimes instead of a strictly linear scaling, a square-root scaling, or a power 2/3 scaling of the learning rate works better

3. Initialising Horovod in your code

The first step is telling your code you will be using Horovod.

import horovod.tensorflow as hvdhvd.init() # this initiliases horovodprint('Total GPUs available:', hvd.size())
print('Hi I am a GPU and my rank is:', hvd.rank())

The next step is to pin each available GPU to a ‘process’ using the GPU’s local rank as an identifier.

# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

4. Distribute the optimizer

This is where the aggregation of the optimizer takes place. Horovod makes this very easy as we only need to add one line to our code to distribute the optimizer.

opt = keras.optimizers.Adam(learning_rate=learning_rate)# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)

5. Fix the call backs

The next step is to make sure we do not corrupt our training by allowing all GPUs to update the model parameters. Thus we declare one of our GPUs as the ‘coordinator’. This GPU will be responsible for collecting the model parameters, aggregating them and broadcasting them back to the other GPUs after every epoch of training. Remember that it will also be training the CNN with its own subset of the data. Usually we declare GPU with rank 0 to be our coordinator.

callbacks = [
# Horovod: broadcast initial variable states from rank 0 to
# all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with
# random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),

# Horovod: average metrics among workers at the end of every
# epoch.
hvd.callbacks.MetricAverageCallback(),
]
# Horovod: save checkpoints only on worker 0 to prevent other
# workers from corrupting them.
if hvd.rank() == 0:
callbacks.append(keras.callbacks.ModelCheckpoint('...h5'))

Running the code

We are now ready to run the code. If you are using a computer with just 1 GPU, you may execute the command from a terminal as

horovodrun -np 1 python python_code/horovod_dist_gen.py

If instead we were to run it on a supercomputer, one would need to write a script that submits our ‘job’, where a job is a piece of code to be run with a given combination of resources. Below is an example of a script where I run the code on a machine with 4 GPUs.

The slurm script to run the CNN code using GPUs with Horovod.

What is the result?

Time with no GPU: 14 hours

Time with 1 GPU: 12 minutes

For data distribution approach using shards, we obtain a 75x speed up using 1 GPU and 150x scale up using 2 or more GPUs.

Speed up obtained after implementing Horovod in our CNN code and running it with 1, 2 and 4 GPUs.

Data distributed as shards performs better as compared to the generator approach. The speed-up from 1 GPU to 2 GPUs is almost linear, i.e. 75x to 150x, but after that there is no more speed-up. Why is that? It is because when we parallelise a code, the idea is to maximise the usage of resources available. With 1 GPU, even if the code trains 75 times faster as compared to the run with no GPUs, there is still scope for benefiting from more computational power. When we add the second GPU, the run-time halves thus leading to the ideal linear speed-up. At this point we are at a stage where the resources provided to the code are optimal for the amount of computation. Adding more resources will not benefit us. Thus we see no more speed up with 4 GPUs. We can monitor the resource usage (in our case the GPU usage) using the command nvidia-smi. I will not delve further into this topic but if you are interested feel free to contact me.

Final Remarks

  • Feel free to devise your own data distribution strategy. It will often depend on how your data is organised and the sheer size of it.
  • When using GPUs, there is always an overhead for the initial set-up, i.e. depending on the system you are using there might be some time needed for setting up the GPU-CPU communication, environment variables etc. This is done by the system under the hood, so if at the start the code seems to stall for a few seconds it most likely is just setting up the GPUs.
  • For the sake of simplicity, I only time the training part. In principle one might need to benchmark the run-time for the entire code. It depends on what you are trying to optimise.
  • In order to benchmark the performance correctly, one would need to run the same code multiple times to get a better statistic of the run-time.
  • One must also check the convergence of the model, i.e. the parallelised training with GPUs should converge to the same loss and accuracy values as seen in the run without GPUs. Furthermore, regardless of the number of GPUs used the accuracy and loss curves should converge to the same values.

--

--