The world’s leading publication for data science, AI, and ML professionals.

Deep Learning on Supercomputers

Hands-on about how to scale a Deep Learning application in the BSC's CTE-Power cluster

Making Sense of Big Data

(Image from bsc.es)
(Image from bsc.es)

This post will be used as documentation in the PATC course Introduction to Big Data Analytics at BSC

In a previous post, we demonstrated that supercomputers are a key component of the progress of Artificial Intelligence and what drove changes in effective compute over the last years was the increased parallelization and distribution of the algorithms.

This post will demonstrate how these supercomputers can be used; specifically, the BSC’s CTE-POWER cluster, in that each server has two CPUs IBM Power9 and four NVIDIA V100 GPUs.

In this series of posts, we will use the TensorFlow framework; however, the code in PyTorch code doesn’t differ too much. We will use the Keras API because since the release of Tensorflow 2.0, tf.keras.Model API has become the primary way of building neural networks, particularly those not requiring custom training loops.

1 – BSC’s CTE-POWER Cluster

1.1 System Overview

CTE-POWER is a cluster based on IBM Power9 processors, with a Linux Operating System and an Infiniband interconnection network. CTE-POWER has 54 compute servers, each of them:

  • 2 x IBM Power9 8335-GTG @ 3.00GHz (20 cores and 4 threads/core, total 160 threads per node)
  • 512GB of main memory distributed in 16 dimms x 32GB @ 2666MHz
  • 2 x SSD 1.9TB as local storage
  • 2 x 3.2TB NVME
  • 4 x GPU NVIDIA V100 (Volta) with 16GB HBM2.
  • Single Port Mellanox EDR
  • GPFS via one fiber link 10 GBit
  • The operating system is Red Hat Enterprise Linux Server 7.4.
Supercomputer Marenostrum - POWER-CTE cluster at Barcelona Supercomputing Center (Image by author)
Supercomputer Marenostrum – POWER-CTE cluster at Barcelona Supercomputing Center (Image by author)

More details of its characteristics can be found in the [CTE-POWER user’s guide](http://More details of its characteristics can be found in the user’s guide and also in the information of the manufacturer of the ACXXX servers) and also in the information of the manufacturer of the AC922 servers.

The allocation of resources from the cluster for the execution of our code will start with a sshlogin in the cluster using one of the login nodes using you account:

$ssh [email protected]

1.2 Warm-up example: MNIST classification

For convenience, we will consider the same neural network that we used to classify MNIST digits in the previous class programmed previously in the Jupyter notebook:

import tensorflow as tf 
from tensorflow import keras
import numpy as np
import matplotlib.pyplot as plt
print(tf.__version__)
from tensorflow.keras import Sequential
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Flatten
model = Sequential()
model.add(Conv2D(32, (5, 5), activation='relu', 
          input_shape=(28, 28, 1)))
model.add(MaxPooling2D((2, 2)))
model.add(Conv2D(64, (5, 5), activation='relu'))
model.add(MaxPooling2D((2, 2)))
model.add(Flatten())
model.add(Dense(10, activation='softmax'))
model.summary()
from keras.utils import to_categorical
mnist = tf.keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data(path='/gpfs/projects/nct00/nct00002/basics-utils/mnist.npz')
train_images = train_images.reshape((60000, 28, 28, 1))
train_images = train_images.astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1))
test_images = test_images.astype('float32') / 255
train_labels = to_categorical(train_labels)
test_labels = to_categorical(test_labels)
model.compile(loss='categorical_crossentropy',
              optimizer='sgd',
              metrics=['accuracy'])
model.fit(train_images, train_labels, batch_size=100, 
          epochs=5, verbose=1)
test_loss, test_acc = model.evaluate(test_images, test_labels)
print('Test accuracy:', test_acc)

This will be the code MNIST.py (available at GitHub), that we will use as a first case study to show how to launch programs in the CTE-POWER supercomputing.

1.3 Software stack

It is important to remember that we need to load all the modules that build our application’s software stack environment. It can be done with the command module load before running the corresponding .py code.

In our case study, we need the following modules that include the required libraries:

module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
python MNIST.py

If we want to detach the standard outputs and the standard error messages, we can add this argument 2>err.txt:

python MNIST.py 2>err.txt

Redirecting the standard error allows us to see the result of the training that gives us the Keras by the standard output without the information related to the execution environment:

Epoch 1/5
600/600 [======] - 2s 3ms/step - loss: 0.9553 - accuracy: 0.7612
Epoch 2/5
600/600 [======] - 1s 2ms/step - loss: 0.2631 - accuracy: 0.9235
Epoch 3/5
600/600 [======] - 2s 3ms/step - loss: 0.1904 - accuracy: 0.9446
Epoch 4/5
600/600 [======] - 2s 3ms/step - loss: 0.1528 - accuracy: 0.9555
Epoch 5/5
600/600 [======] - 2s 3ms/step - loss: 0.1288 - accuracy: 0.9629
313/313 [======] - 1s 2ms/step - loss: 0.1096 - accuracy: 0.9671
Test accuracy: 0.9671000242233276

Well, our code is executed in the login node shared with other jobs from users that are trying to submit jobs to the SLURM system, but what we really need is to allocate resources for our code. How can we do it?

1.4 How to allocate computing resources with SLURM

To run a code in CTE-POWER we use a SLURM workload manager. A very good Quick Start User Guide can be found here. We have two ways to do use it: sbatchand salloc commands.

The method for submitting jobs that we will center our hands-on will be using the SLURM sbatchcommand directly. sbatch submits a batch script to Slurm. The batch script may be given to sbatch through a file name on the command line (.sh file). The batch script may contain options preceded with #SBATCH before any executable commands in the script. sbatch will stop processing further #SBATCH directives once the first non-comment non-whitespace line has been reached in the script.

sbatch exits immediately after the script is successfully transferred to the SLURM controller and assigned a Slurm job ID. The batch script is not necessarily granted resources immediately, it may sit in the queue of pending jobs for some time before its required resources become available.

By default both standard output and standard error are directed to the files indicated by:

#SBATCH --output=MNIST_%j.out
#SBATCH --error=MNIST_%j.err

where the "%j" is replaced with the job allocation number. The file will be generated on the first node of the job allocation. When the job allocation is finally granted for the batch script, Slurm runs a single copy of the batch script on the first node in the set of allocated nodes. In the following sections, we will introduce other parameters.

An example of job script that allocate a node with 1 GPUs for our case study looks like this (MNIST.sh ):

#!/bin/bash
#SBATCH --job-name="MNIST"
#SBATCH -D .
#SBATCH --output=MNIST_%j.out
#SBATCH --error=MNIST_%j.err
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=40
#SBATCH --gres=gpu:1
#SBATCH --time=00:10:00
module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
python MNIST.py

You can consult this official page documentation to know all the options we can use in the batch script preceded with#SBATCH.

These are the basic directives to submit and monitor jobs with SLURM that we will use in our case study:

  • sbatch <job_script> submits a job script to the queue system.
  • squeue shows all the submitted jobs with their <job_id>.
  • scancel <job_id> remove the job from the queue system, canceling the execution of the processes, if they were still running.

An alternative to run a job in SLURM is using the salloc command. It is used to allocate resources for a job in real-time. Typically this is used to allocate resources and spawn a shell. The shell is then used to execute srun commands to launch parallel tasks. More details of its use can be found here.

In summary, this can be an example of sequence of command lines and the expected output of their execution:

[CTE-login-node ~]$ sbatch MNIST.sh
Submitted batch job 4910352
[CTE-login-node ~]$ squeue
JOBID    PARTITION  NAME    USER    ST TIME  NODES  NODELIST
4910352  main       MNIST   userid  R  0:01  1      p9r1n16
[CTE-login-node ~]$ ls
MNIST.py
MNIST.sh
MNIST_4910352.err
MNIST_4910352.out

The standard output and standard error are directed to the files MNIST_4910355.out and MNIST_4910355.err, respectively. I suggest the reader inspect these files before proceeding. Here, the number 4910352indicates the job id assigned to the job by SLURM.

2 – Basic concepts in distributed and parallel training Deep Learning

"Methods that scale with computation are the future of Artificial Intelligence" **** – Rich Sutton, father of reinforcement learning (video 4:49)

Deep Neural Networks (DNN) base their success on building high learning capacity models with millions of parameters that are tuned in a data-driven fashion. These models are trained by processing millions of examples so that the development of more accurate algorithms are usually limited by the throughput of the computing devices on which they are trained.

2.1 Performance metrics: Speedup, Throughput and Scalability

In order to make the training process faster we are going to need some performance metrics to measure it. The term performance in these systems has a double interpretation. On the one hand, it refers to the predictive accuracy of the model. On the other, to the computational speed of the process.

The accuracy is independent of the platform, and it is the performance metric to compare multiple models. In contrast, the computation speed depends on the platform on which the model is deployed. We will measure it by metrics such as Speedup, the ratio of solution time for the sequential (or 1 GPU) algorithms versus its parallel counterpart (with many GPUs). This is a prevalent concept in our daily argot in the Supercomputing community 😉 .

Another important metric is Throughput; for example, the number of images per unit time that can be processed. This can give us a good benchmark of performance (although it depends on the network type).

Finally, a concept that we usually use is Scalability. It is a more generic concept that refers to the ability of a system to handle a growing amount of work efficiently. These metrics will be highly dependent on the cluster configuration, the type of network used, or the framework’s efficiency using the libraries and managing resources.

2.2 Parallel computer platforms

For this reason, an approach for parallel and distributed training is used. The main idea behind this computing paradigm is to run tasks in parallel instead of serially, as it would happen in a single machine.

DNN are often compute-intensive, making them similar to traditional supercomputing (high-performance computing, Hpc) applications. Thus, large learning workloads perform very well on accelerated systems such as general-purpose graphics processing units (GPU) that have been used in the Supercomputing field.

Multiple GPUs increase both memory and compute available for training a DNN. In a nutshell, we have several choices, given a minibatch of training data that we want to classify. In the next subsection, we will go into more detail about this.

2.3 Types of parallelism

To achieve the distribution of the training step, there are two principal implementations, and it will depend on the needs of the application to know which one will perform better, or even if a mix of both approaches can increase the performance.

For example, different layers in a Deep Learning model may be trained in parallel on different GPUs. This training procedure is commonly known as Model parallelism. Another approach is Data parallelism, where we use the same model for every execution unit, but train the model in each computing device using different training samples.

(Image by author)
(Image by author)

2.3.1 Data parallelism

In this mode, the training data is divided into multiple subsets, and each one of them is run on the same replicated model in a different GPU (worker nodes). These will need to synchronize the model parameters (or its "gradients") at the end of the batch computation to ensure they are training a consistent model (just as if the algorithm run on a single processor) because each device will independently compute the errors between its predictions for its training samples and the labeled outputs (correct values for those training samples). Therefore, each device must send all of its changes to all of the models on all the other devices.

One interesting property of this setting is that it will scale with the amount of data available, and it speeds up the rate at which the entire dataset contributes to the optimization. Also, it requires less communication between nodes, as it benefits from a high amount of computations per weight. On the other hand, the model has to fit on each node entirely, and it is mainly used for speeding computation of convolutional neural networks with large datasets.

2.3.2 Model parallelism

We could partition the network layers across multiple GPUs (even we could split the work required by individual layers). That is, each GPU takes as input the data flowing into a particular layer, processes data across several subsequent layers in the neural network, and then sends the data to the next GPU.

In this case (also known as Network Parallelism), the model will be segmented into different parts that can run concurrently, and each one will run on the same data in different nodes. This method’s scalability depends on the degree of task parallelization of the algorithm, and it is more complex to implement than the previous one. It may decrease the communication needs, as workers need only to synchronize the shared parameters (usually once for each forward or backward-propagation step) and works well for GPUs in a single server that shares a high-speed bus. It can be used with larger models as hardware constraints per node are no more a limitation.

In general, the parallelization of the algorithm is more complex to implement than run the same model in a different node with a subset of data.

In this Hands-on, we will focus on the Data Parallelism approach.

3 – Case study: Image classification problem

In this section, we will explain how to scale a "real" neural network ResNet50V2 with 25,613,800 parameters. As a dataset, we will use the popular CIFAR10 dataset. First we will describe the sequential execution and later we will parallelize it over many GPUs.

3.1 Dataset: CIFAR10

CIFAR-10 is an established computer-vision dataset used for object recognition. It is a subset of the 80 million tiny images dataset and consists of 60,000 32×32 color images containing 10 object classes, with 6000 images per class. It was collected by Alex Krizhevsky, Vinod Nair, and Geoffrey Hinton. There are 50,000 training images and 10,000 test images.

Learning Multiple Layers of Features from Tiny Images, Alex Krizhevsky, 2009.
Learning Multiple Layers of Features from Tiny Images, Alex Krizhevsky, 2009.

We have preloaded CIFAR-10 dataset at CTE-POWER supercomputer in the directory /gpfs/projects/nct00/nct00002/cifar-utils/cifar-10-batches-py downloaded from http://www.cs.toronto.edu/~kriz/cifar.html.

For academic purposes, to make the training even harder and being able to see larger training times for better comparison, we have applied a resize operation to make the images of 128×128 size. We created a custom load_data function (/gpfs/projects/nct00/nct00002/cifar-utils/load_cifar.py) that applies this resize operation and splits the data into training and test sets. We can use it as:

sys.path.append('/gpfs/projects/nct00/nct00002/cifar-utils')
from cifar import load_cifar

load_cifar.py can be obtained from the this repository GitHub for readers that want to review it (for the students of this course it is not necessary to download it).

3.2 Neural Networks architecture: ResNet

AlexNet, by Alex Krizhevsky, is the neural network architecture that won the ImageNet 2012 competition. GoogleLeNet, which with its inception module drastically reduces the parameters of the network (15 times less than AlexNet). Others, such as the VGGnet, helped to demonstrate that the depth of the network is a critical component for good results. The interesting thing about many of these networks is that we can find them already preloaded in most of the Deep Learning frameworks.

Keras Applications are prebuilt deep learning models that are made available. These models differ in architecture and the number of parameters; you can try some of them to see how the larger models train slower than, the smaller ones and achieve different accuracy.

A list of all available models can be found here (the top-1 and top-5 accuracy refers to the model’s performance on the ImageNet validation dataset.). For this post, we will consider two architectures from the family of ResNet as a case study: ResNet50v2 and ResNet152v2.

ResNet **** is a family of extremely deep neural network architectures showing compelling accuracy and nice convergence behaviors, introduced by He et al. in their 2015 paper, Deep Residual Learning for Image Recognition.

A few months later, the same authors published a new paper, Identity Mapping in Deep Residual Network, with a new proposal for the basic component, the residual unit, which makes training easier and improves generalization. And this lets the V2 versions:

tf.keras.applications.ResNet101V2(
    include_top=True,
    weights="imagenet",
    input_tensor=None,
    input_shape=None,
    pooling=None,
    classes=1000,
    classifier_activation="softmax",
)
tf.keras.applications.ResNet152V2(
    include_top=True,
    weights="imagenet",
    input_tensor=None,
    input_shape=None,
    pooling=None,
    classes=1000,
    classifier_activation="softmax",
)

The "50" and "152" stand for the number of weight layers in the network. The arguments for both networks are:

  • include_top: whether to include the fully-connected layer at the top of the network.
  • weights: one of None (random initialization), ‘imagenet’ (pre-training on ImageNet), or the path to the weights file to be loaded.
  • input_tensor: optional Keras tensor (i.e. output of layers.Input()) to use as image input for the model.
  • input_shape: optional shape tuple, only to be specified if include_top is False (otherwise the input shape has to be (224, 224, 3) (with 'channels_last' data format) or (3, 224, 224)(with 'channels_first' data format). It should have exactly 3 inputs channels, and width and height should be no smaller than 32. E.g. (200, 200, 3) would be one valid value.
  • pooling: Optional pooling mode for feature extraction when include_top is False. (a)None means that the output of the model will be the 4D tensor output of the last convolutional block. (b) avg means that global average pooling will be applied to the output of the last convolutional block, and thus the output of the model will be a 2D tensor. (c)max means that global max pooling will be applied.
  • classes: optional number of classes to classify images into, only to be specified if include_topis True, and if no weights argument is specified.
  • classifier_activation: A str or callable. The activation function to use on the "top" layer. Ignored unless include_top=True. Set classifier_activation=None to return the logits of the "top" layer.

Note that if weights="imagenet", Tensorflow middleware requires a connection to the internet to download the imagenet weights (pre-training on ImageNet). Due we are not centering our interest in Accuracy, we didn’t download the file with the imagenet weights, therefore, it must be used weights=None.

3.3 Frameworks for parallel and distributed training: TensorFlow and Horovod

We can use frameworks as TensorFlow of Pytorch to program multi-GPU training in one server. To parallelize the training of the model, you only need to wrap the model with [torch.nn.parallel.DistributedDataParallel](https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html)in PyTorch and with [tf.distribute.MirroredStrategy](https://www.TensorFlow.org/api_docs/python/tf/distribute/MirroredStrategy)in TensorFlow.

However, the number of GPUs that we can place in a server is very limited, and the solution goes through putting many of these servers together, as we did at the BSC, with the CTE-POWER supercomputer, where 54 servers are linked together with an InfiniBand network on optical fiber.

In this new scenario, we need an extension of the software stack to deal with multiple distributed GPUs in the neural network training process. There are other options, but in our research group at BSC, we decided to use Horovod, from Uber. Horovod Plugs into TensorFlow, PyTorch. But this option is not included in this hands-on. If the reader is interested in using Horovod can follow this hands-on about Horovod.

As a summary, in the following sections of this hands-on, we will present how to scale the training of a DL neural network on Multiple GPUs in one Server using TensorFlowtf.distributed.MirroredStrategy()API.

3.4 How to train the sequential version of ResNet

Before parallelizing a neural network training, let’s start with a sequential version of the training. The sequential code to train the previously described problem of classification of CIFAR10 dataset using a ResNet50 neural network could beResNet50_seq.py:

import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import models
import numpy as np
import argparse
import time
import sys
sys.path.append('/gpfs/projects/nct00/nct00002/cifar-utils')
from cifar import load_cifar
parser = argparse.ArgumentParser()
parser.add_argument(' - epochs', type=int, default=5)
parser.add_argument(' - batch_size', type=int, default=2048)
args = parser.parse_args()
batch_size = args.batch_size
epochs = args.epochs
train_ds, test_ds = load_cifar(batch_size)
model = tf.keras.applications.resnet_v2.ResNet50V2(    
        include_top=True, 
        weights=None, 
        input_shape=(128, 128, 3), 
        classes=10)
opt = tf.keras.optimizers.SGD(0.01)
model.compile(loss='sparse_categorical_crossentropy',
              optimizer=opt,
              metrics=['accuracy'])
model.fit(train_ds, epochs=epochs, verbose=2)

ResNet50_seq.py file can be downloaded from the course repository GitHub.

To run it can be done using the following SLURM script (ResNet50_seq.sh):

#!/bin/bash
#SBATCH - job-name="ResNet50_seq"
#SBATCH -D .
#SBATCH - output=RESNET50_seq_%j.out
#SBATCH - error=RESNET50_seq_%j.err
#SBATCH - nodes=1
#SBATCH - ntasks=1
#SBATCH - cpus-per-task=160
#SBATCH - time=00:15:00 
module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML 
python ResNet50_seq.py --epochs 20 --batch_size 256

ResNet50_seq.sh can be downloaded from the course repository GitHub.

If we check the .out file we can see the result of the output that gives us the Keras:

Epoch 1/20
196/196 - 41s - loss: 2.0176 - accuracy: 0.2584
Epoch 2/20
196/196 - 41s - loss: 1.7346 - accuracy: 0.3648
Epoch 3/20
196/196 - 41s - loss: 1.5624 - accuracy: 0.4271
Epoch 4/20
196/196 - 41s - loss: 1.4427 - accuracy: 0.4715
Epoch 5/20
196/196 - 41s - loss: 1.3523 - accuracy: 0.5090
Epoch 6/20
196/196 - 41s - loss: 1.2699 - accuracy: 0.5417
Epoch 7/20
196/196 - 41s - loss: 1.1894 - accuracy: 0.5719
Epoch 8/20
196/196 - 41s - loss: 1.1048 - accuracy: 0.6076
Epoch 9/20
196/196 - 41s - loss: 1.0136 - accuracy: 0.6439
Epoch 10/20
196/196 - 41s - loss: 0.9174 - accuracy: 0.6848
Epoch 11/20
196/196 - 41s - loss: 0.8117 - accuracy: 0.7256
Epoch 12/20
196/196 - 41s - loss: 0.6989 - accuracy: 0.7705
Epoch 13/20
196/196 - 41s - loss: 0.5858 - accuracy: 0.8117
Epoch 14/20
196/196 - 41s - loss: 0.4870 - accuracy: 0.8482
Epoch 15/20
196/196 - 41s - loss: 0.4003 - accuracy: 0.8749
Epoch 16/20
196/196 - 41s - loss: 0.3194 - accuracy: 0.9040
Epoch 17/20
196/196 - 41s - loss: 0.2620 - accuracy: 0.9227
Epoch 18/20
196/196 - 41s - loss: 0.2008 - accuracy: 0.9421 
Epoch 19/20
196/196 - 41s - loss: 0.1441 - accuracy: 0.9615
Epoch 20/20
196/196 - 41s - loss: 0.0742 - accuracy: 0.9859

For the purpose of this post, to calculate the time (which will be the metric we will use to compare), we will use the time that Keras himself tells us that it takes an epoch (sometimes we discard the first epoch as it is different from the rest since it has to create structures in memory and initialize them). Remember that we are in an academic example, and with this approximate measure of time, we have enough for the course goals.

For any other of the networks available in Keras, simply change in the code the piece of code that is in bold, resnet_v2.ResNet50V2, for the corresponding network.

4 – Parallel training with TensorFlow

[tf.distribute.Strategy](https://www.tensorflow.org/api_docs/python/tf/distribute/Strategy) is a TensorFlow API to distribute training across multiple GPU or TPUs with minimal code changes (from the sequential version presented in the previous post). This API can be used with a high-level API like Keras, and can also be used to distribute custom training loops.

tf.distribute.Strategy intends to cover a number of distribution strategies use cases along different axes. The official web page of this feature, presents all the currently supported combinations, however, in this article we will focus our attention in [tf.distribute.MirroredStrategy](https://www.tensorflow.org/api_docs/python/tf/distribute/MirroredStrategy).

4.1 MirroredStrategy strategy

We will use in this hands-on [tf.distribute.MirroredStrategy](https://www.tensorflow.org/api_docs/python/tf/distribute/MirroredStrategy) , that supports synchronous distributed training on multiple GPUs on one server. It creates one replica per GPU device. Each variable in the model is mirrored across all the replicas. These variables are kept in sync with each other by applying identical updates.

With this strategy efficient all-reduce algorithms are used to communicate the variable updates across the GPUs. By default, it uses NVIDIA NCCL as the all-reduce implementation. More detail of this can be found in this post.

Here is the simplest way of creating MirroredStrategy:

mirrored_strategy = tf.distribute.MirroredStrategy()

This will create a MirroredStrategy instance that will use all the GPUs visible to TensorFlow. It is possible to see the list of available GPU devices doing the following:

devices = tf.config.experimental.list_physical_devices("GPU")

It is also possible to use a subset of the available GPUs in the system by doing the following:

mirrored_strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

To build the model and compile it inside the MirroredStrategyscope you can do it in the following way:

with mirrored_strategy.scope():
     model = tf.keras.applications.resnet_v2.ResNet50V2(
             include_top=True, weights=None, 
             input_shape=(128, 128, 3), classes=10)

     opt = tf.keras.optimizers.SGD(learning_rate)

     model.compile(loss='sparse_categorical_crossentropy', 
                   optimizer=opt, metrics=['accuracy'])

This allows us to create distributed variables instead of regular variables: each variable is mirrored across all the replicas and are kept in sync with each other by applying identical updates.

Training can be done as usual outside the strategy scope with:

dataset = load_data(batch_size)
model.fit(dataset, epochs=5, verbose=2)

4.2 Distribution performance measurement

In this post, we will consider using the epoch time as a measure of the computation time for training a DNN. This approximated measure in seconds provided by Keras during the .fit is enough for the purpose of this academic post. In our case, sometimes we will discard the first time epoch as it includes creating and initializing structures. Obviously, for certain types of studies, it is necessary to go into more detail, differentiating the loading data, feeds forward time, loss function time, back progatation time, etc., but it falls outside the scope of this case study we propose in this hands-on.

4.3 Choose the Batch Size and Learning Rate

When training, it is required to allocate memory to store samples for training the model and the model itself. We have to keep in mind this in order to avoid an out of memory error.

Remember that the batch_size is the number of samples that the model will see at each training step, and in general, we want to have this number as biggest as possible (powers of 2). We can calculate it by try and error approach testing different values until an error related to the memory appears:

python ResNet50.py -- epoch 1 -- batch_size 16
python ResNet50.py -- epoch 1 -- batch_size 32
python ResNet50.py -- epoch 1 -- batch_size 64

When using MirroredStrategy with multiple GPUs, the batch size indicated is divided by the number of replicas. Therefore the batch_size that we should specify to TensorFlow is equal to the maximum value for one GPU multiplied by the number of GPUs we are using. This is, in our example, use these flags in the python program:

python ResNet50.py -- epochs 5 -- batch_size 256  -- n_gpus 1
python ResNet50.py -- epochs 5 -- batch_size 512  -- n_gpus 2
python ResNet50.py -- epochs 5 -- batch_size 1024 -- n_gpus 4

Accordingly, with the batch_size, if we are using MirroredStrategy with multiple GPUs, we want the learning_rate to be:

learning_rate = learning_rate_base*number_of_gpus
opt = tf.keras.optimizers.SGD(learning_rate)

Because of a larger batch_size, we also want to take bigger steps in the direction of the minimum to preserve the number of epochs to converge.

4.4 Case Study

We will take the example presented in the previous post, ResNet50 with CIFAR10, and train it on the CTE-POWER machine.

Parallel code for ResNet50 neural network

Following the steps presented in the above section on how to apply MirroredStrategy, below we present the resulting parallel code:

import tensorflow as tf 
from tensorflow.keras import layers 
from tensorflow.keras import models
import numpy as np
import argparse
import time
import sys
sys.path.append('/gpfs/projects/nct00/nct00002/cifar-utils')
from cifar import load_cifar
parser = argparse.ArgumentParser()
parser.add_argument(' -- epochs', type=int, default=5)
parser.add_argument(' -- batch_size', type=int, default=2048)
parser.add_argument(' -- n_gpus', type=int, default=1)
args = parser.parse_args()
batch_size = args.batch_size
epochs = args.epochs
n_gpus = args.n_gpus
train_ds, test_ds = load_cifar(batch_size)
device_type = 'GPU'
devices = tf.config.experimental.list_physical_devices(
          device_type)
devices_names = [d.name.split("e:")[1] for d in devices]
strategy = tf.distribute.MirroredStrategy(
           devices=devices_names[:n_gpus])
with strategy.scope():
     model = tf.keras.applications.resnet_v2.ResNet50V2(
             include_top=True, weights=None,
             input_shape=(128, 128, 3), classes=10)
     opt = tf.keras.optimizers.SGD(0.01*n_gpus)
     model.compile(loss='sparse_categorical_crossentropy', 
                   optimizer=opt, metrics=['accuracy'])
model.fit(train_ds, epochs=epochs, verbose=2)

SLURM script

The SLURM script to send the jobs to execution is:

#!/bin/bash
#SBATCH --job-name="ResNet50"
#SBATCH --D .
#SBATCH --output=ResNet50_%j.output
#SBATCH --error=ResNet50_%j.err
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=160
#SBATCH --gres=gpu:4
#SBATCH --time=00:30:00
module purge; module load gcc/8.3.0 cuda/10.2 cudnn/7.6.4 nccl/2.4.8 tensorrt/6.0.1 openmpi/4.0.1 atlas/3.10.3 scalapack/2.0.2 fftw/3.3.8 szip/2.1.1 ffmpeg/4.2.1 opencv/4.1.1 python/3.7.4_ML
python ResNet50.py -- epochs 5 -- batch_size 256 -- n_gpus 1
python ResNet50.py -- epochs 5 -- batch_size 512 -- n_gpus 2
python ResNet50.py -- epochs 5 -- batch_size 1024 -- n_gpus 4

We are using the same SLURM for all the executions. Pay attention to indicate the number of GPUs required with --gres=gpu:4 .

Results

Once I run the previous script, in the file that has stored the standard output we find the following execution times:

python ResNet50.py --epochs 5 --batch_size 256 --n_gpus 1 
Epoch 1/5 
196/196 - 49s - loss: 2.0408 - accuracy: 0.2506
Epoch 2/5
196/196 - 45s - loss: 1.7626 - accuracy: 0.3536
Epoch 3/5
196/196 - 45s - loss: 1.5863 - accuracy: 0.4164
Epoch 4/5
196/196 - 45s - loss: 1.4550 - accuracy: 0.4668
Epoch 5/5
196/196 - 45s - loss: 1.3539 - accuracy: 0.5070
python ResNet50.py --epochs 5 --batch_size 512 --n_gpus 2
Epoch 1/5
98/98 - 26s - loss: 2.0314 - accuracy: 0.2498
Epoch 2/5
98/98 - 24s - loss: 1.7187 - accuracy: 0.3641
Epoch 3/5
98/98 - 24s - loss: 1.5731 - accuracy: 0.4207
Epoch 4/5
98/98 - 23s - loss: 1.4543 - accuracy: 0.4686
Epoch 5/5
98/98 - 23s - loss: 1.3609 - accuracy: 0.5049
python ResNet50.py --epochs 5 --batch_size 1024 --n_gpus 4 
Epoch 1/5
49/49 - 14s - loss: 2.0557 - accuracy: 0.2409
Epoch 2/5
49/49 - 12s - loss: 1.7348 - accuracy: 0.3577
Epoch 3/5
49/49 - 12s - loss: 1.5696 - accuracy: 0.4180
Epoch 4/5
49/49 - 12s - loss: 1.4609 - accuracy: 0.4625
Epoch 5/5
49/49 - 11s - loss: 1.3689 - accuracy: 0.5010

It is important to note that we center our interest in the computational speed of the process rather than the model’s accuracy. For this reason, we will only execute a few number of epochs during the training, due as we will see training time per epoch are constant, and with 5 epochs we achieve the same accuracy in all cases. That means that only 5 epochs serve us to compare the 3 options, the purpose of these tests.

It is evident that if we want to do a good study, we should carry out several tests and then take the averages of the times obtained in each one of them. But given the academic purpose of this post (and the cost of resources, which we have to save!), it is enough only one execution.

4.5 Analysis of the results

In summary, the time required for doing one epoch is as shows the following plot:

We can translate this to images per second (since we know there are 50,000 images), which gives us the throughput:

Finally, as I said, the speedup is the most relevant metric:

It’s an almost linear speedup. We refer to linear speedup linear when the workload is equally divided between the number of GPUs.

5 – It is time to learn by doing with ResNet152V2

If you want to learn more and consolidate the knowledge acquired, now it’s your turn to get your hands dirty to reproduce the above results for the ResNET152V2, and then compare them with those presented in the previous section. Are you ready? Come on!

Remember that the first step is to find the best batch_size. If you plot the results, you should find results equivalent to those shown below:

A couple of relevant things are observed. The first is that the time to run a ResNet152 epoch is much longer, and therefore the throughput in images per second is much lower than on the ResNet50 network. Why is this happening? ResNet152 network is deeper, meaning that it has more layers, therefore the training time will be higher.

It can be seen that the speedup for the ResNet152 is no longer linear; could you try to give a possible answer as to why this is happening? Obviously it depend of many things and it is required a detailed analysis, however due to the biggest size of network it will add additional latency for synchronisation.

6 – Summary

In this hands-on, we have presented how we could paralyze a single Deep Neural Network training over many GPUs in one server using tf.distributed.MirroredStrategy() TensorFlow API.

As we can see in the results, the accuracy achieved by our model is more or less constant independently of the number of GPUs. In summary, using a distributed strategy, our model can learn the same but faster, thanks to parallelization!

Thanks to parallelization, Deep Learning can learn the same but faster!


GitHub of this course: https://github.com/jorditorresBCN/PATC-2021


Acknowledgment: Many thanks to Juan Luis Domínguez and Oriol Aranda from BSC-CNS, who wrote the first version of the codes that appear in this hands-on, and to Carlos Tripiana and Félix Ramos for the essential support using the POWER-CTE cluster. Also many thanks to Alvaro Jover Alvarez, Miquel Escobar Castells and Raul Garcia Fuentes for their contributions to the proofreading of this document.


Related Articles