Making Sense of Big Data

Horovod is a popular framework for running distributed training on multiple GPU workers and across multiple hosts. Elastic Horovod is an exciting new feature of Horovod that introduces support for fault-tolerance, enabling training to continue uninterrupted, even in the face of failing or resuming hosts. In this post I will explain how Elastic Horovod can be used to reduce cost in a distributed training environment and demonstrate the configuration steps required to run it on Amazon Elastic Compute Cloud (Amazon EC2) spot instances.
The post includes four parts. We begin in part 1 by describing how to reduce training costs using spot instances. In part 2 we talk about data distributed training and introduce the challenge of training on multiple spot instances. In part 3 we describe the manner in which Elastic Horovod solves this challenge and elaborate on some of the aspects of the solution. In part 4 we demonstrate an example of Elastic Horovod on Amazon Ec2 spot instances.
The story I will tell, and the examples I will share, are based on Horovod with Gloo version 0.21.2, PyTorch version 1.4, and TensorFlow version 2.3. These frameworks continue to evolve rapidly, and some of the API definitions, usage flows, and function behaviors are likely to change. Be sure to reevaluate any assumptions we will make and conclusions we will draw against future releases.
Disclaimer 1: The intention of this post is to draw attention to this intriguing and important new feature. It is not, necessarily, to recommend its use. Whether or not to integrate Elastic Horovod should be determined by a wide variety of considerations, some of which we will discuss below.
Disclaimer 2: This post is by no means intended to replace the Elastic Horovod documentation. We will refer to the documentation numerous times, elaborate on some of the delicate points, extend one of the samples, and demonstrate an end to end example. However, the official documentation should be your source of reference for any of your own implementations.
Part 1: Reducing Training Costs With Spot Instances
One of the main challenges of modern day DNN training projects is managing training costs. The hardware required to train models is expensive, and taking into account that we often require running multiple training experiments, sometimes on multiple devices, it is not hard to see how cost could quickly become an issue.
One compelling way of lowering costs when training in the cloud is by using discounted compute engines from surplus cloud service capacity. In AWS these are called Amazon EC2 Spot Instances, in Google Cloud they are called Preemptible VM Instances, and in Microsoft Azure they are called Low-Priority VMs. The precise details of the offerings tend to vary, but the common denominator is that they all offer significant discounts for unused compute engines.
The trade-off is that, contrary to on-demand or reserved instances, these machines might not be available (e.g. during peak usage), and even if successfully acquired, they can be preempted at any time. As a result, spot instances might not be a good choice for time critical training tasks. But for other tasks they can potentially provide significant monetary savings.
Of course, the likelihood of premature termination of a training job needs to be taken into consideration. Nobody wants to train a model for hours only to have their training machine preempted just as they are about to finish. As I described in a previous post we can protect against this by developing fault tolerance into our training applications. Specifically, by periodically storing model checkpoints we can ensure that a terminated training session can resume from the last stored checkpoint rather than starting to train from scratch. The frequency at which to store checkpoints should be determined by weighing the computation overhead of storing checkpoints, against the overhead of having to rerun training from the last stored checkpoint in the case of termination. Storing checkpoints at a high frequency will reduce the overhead of resuming after termination, but will increase the overhead of storing the model weights. Storing checkpoints at a low frequency reduces the time spent capturing the model weights, but increases the risk of longer periods of retraining in the case of premature termination.
Part 2: Accelerating Training Through Data Distribution
A common method for accelerating DNN development is by performing data distributed training. In data distributed training, we train our model on multiple workers (GPUs) in parallel. The multiple workers can be on a single device (with multiple GPUs), or on multiple devices. At each training step, each worker trains on a different batch of samples and then shares its resultant gradients with all of the other workers. Denoting the number of workers by N, the effect of distributed training is that we end up training on N batches per training step. If configured correctly, this could reduce the number of steps required for model convergence by a factor of N.
There is a growing trend to distribute training of models, especially large models, across more and more (GPU) workers, and on more and more devices. Naturally, the use of multiple expensive machines magnifies the potential savings of training on low cost spot instances. However, in this case we need to now consider the possibility of a spot interruption on any one of the multiple training devices. (An interesting question is whether or not using N devices over 1/N-th the number of training steps increases our overall vulnerability to spot interruptions. Another interesting question is whether a spot interruption of one instance increases the likelihood of spot interruptions of the other instances. While it is difficult to anticipate spot interruptions or to calculate its likelihood, there are strategies you can adopt for reducing vulnerability to multiple spot interruptions. For more on the topic see here.)
In many distributed training frameworks, including Horovod, the entire training session will fail in the event of a spot termination of a single device. Since the cost overhead of a spot interruption in the case of N workers is much higher, it makes sense to increase the frequency of capturing checkpoints (as described above). But the cost overhead of capturing each checkpoint is also much higher, as there are now N workers that are paused during the checkpoint capturing.
One might wonder why, in the case of multiple training devices, do we need to terminate the entire training session when only one device has been interrupted. Why can’t training simply continue on the remaining devices? This is where Elastic Horovod comes into the picture.
Part 3: Elastic Horovod
Introduced in Horovod version 0.20, Elastic Horovod supports scaling the number of workers up and down dynamically, without interrupting the training process. In particular, if you are running with multiple spot instances and one of them is interrupted, you will not need to restart the training session (from the latest checkpoint), saving you significant time (and cost) overhead. This also applies when the same spot instance resumes activity (if it is configured to be "persistent"), or a new spot instance is introduced. In the next subsections we will discuss some of the aspects of this feature.
Horovod with Gloo
Elastic Horovod requires the Gloo controller for coordinating work between Horovod processes. This is contrary to the more common MPI controller that does not support Elastic Horovod. See the Horovod installation guide for more details on Gloo.
Discovering Hosts
In order to support dynamic scaling, Elastic Horovod requires a mechanism for discovering host devices. This is provided through a user defined "host discovery script" that is passed to the horovodrun command line. The script must be defined such that it outputs the list of the names of the available hosts. See here for more details. Here is an example of a very simple host discovery bash script that iterates over a predefined list of IP addresses and prints only the ones from which it receives a successful ping response:
#!/bin/bash
hostArray=(
"10.0.1.20"
"10.0.1.21"
"10.0.1.22"
)
for host in ${hostArray[*]}; do
if ping -c 1 $host &> /dev/null
then
echo $host
fi
done
In a real world scenario the host discovery script will likely be much more sophisticated, and will depend on your mechanism for allocating worker machines and how you assign them IP addresses or hostnames.
PyTorch Example
Here is an example of how to create a simple training script using PyTorch and Elastic Horovod. The script is based on the template proposed here with the addition of the missing functionality. We have highlighted the portions of the code related to Elastic Horovod.
import torch
import horovod.torch as hvd
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
hvd.init()
torch.cuda.set_device(hvd.local_rank())
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
lr = 0.001
model = models.resnet50(pretrained=True)
model.to(device)
loss_optim = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer)
def get_random_batch():
batch_size = 2
data_in = torch.rand(batch_size, 3, 64, 64).to(device)
target = torch.zeros(batch_size).long().to(device)
return data_in, target
# training loop must be wrapped by @hvd.elastic.run
@hvd.elastic.run
def train(state):
for state.epoch in range(state.epoch, 100):
print("Epoch", state.epoch)
for state.batch in range(state.batch, 100):
data, target = get_random_batch()
optimizer.zero_grad()
output = model(data)
loss = loss_optim(output, target)
loss.backward()
optimizer.step()
# commit the state at the end of each epoch
# see documentation for how to choose frequency of 'commit'
state.commit()
state.batch = 0
def on_state_reset():
# adjust learning rate on reset
for param_group in optimizer.param_groups:
param_group['lr'] = lr * hvd.size()
# state object tracks and synchronizes state among the workers
state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
state.register_reset_callbacks([on_state_reset])
train(state)
Here is an example of how to run Elastic Horovod with a disover_host.sh bash script, host devices containing 4 GPUs (slots) each, and the Python script above (which we named train.py):
horovodrun -np 8
--min-np 4
--max-np 12
--host-discovery-script discover_hosts.sh
--slots 4
python train.py
In this example, the training will start once 8 workers are available, will continue so long as 4 workers are available, and will not train on more than 12 workers at any given time.
The Elastic State Object
Synchronization between training workers is managed by the elastic state object. This object encapsulates all of the variables that need to be synchronized between workers, including the model weights, optimizer variables, epoch, and batch number.
The state object includes a commit function for backing up the training state. This is a fail-safe intended to protect against the possibility of state corruption due to an unexpected crash of one of the worker threads. Similar to the dilemma discussed above regarding the choice of checkpoint capturing frequency, deciding how often to commit the training state object is a trade-off between reducing the overhead of retraining from the last backup in the case of failure, and the overhead of the commit action. Our goal should be to configure our environment such that unexpected crashes are extremely unlikely, and reduce commits to a minimum. For example, we can rely on AWS spot interruption notices to ensure spot interruptions are handled by the discovery script in a graceful manner, and avoid unexpected connection failures.
Important tip regarding the Elastic Horovod Keras API: When training with TensorFlow Keras, the elastic state commitment is managed via the hvd.elastic.CommitStateCallback callback. Beware that the default behavior of this callback (as demonstrated in the elastic keras example) is to commit after every batch. This will slow down your training significantly. To avoid this, specify a _batches_percommit value in the callback constructor.
Reset Callback Function
The reset callback function is responsible for making changes to the model configuration based on the addition or removal of training workers. Special attention is required for training hyperparameters that may be dependent on the global batch size, most notably the optimizer parameters. The performance of many popular optimizers (such as Adam) depends on the value of the global batch size. If the global batch size changes during training without appropriate adjustments to the optimizer settings, the training convergence can be negatively impacted. The Elastic Horovod documentation recommends scaling the learning rate according to the number of active workers (as in the PyTorch sample above) or using an optimizer (such as Adasum) that is less sensitive to changes in the number of workers. Another option is to modify the local batch size on each worker so that the global batch size is unchanged (assuming the GPU memory allows for this and that it does not result in GPU resource under-utilization). But there is no guarantee that these techniques will suffice. Before adapting elastic training, you should verify that your training algorithms are able to handle the changes to the number of workers without harming convergence.
Data Partitioning
One of the decisions that you need to make when performing multi-worker data distributed training is how to divide your training data between the different workers. This decision is somewhat complicated in the elastic training scenario. If you choose to shard your data such that each worker trains on an independent subset of the data, then you may want to repartition the data each time the number of workers changes to ensure that every data sample receives the same attention.
You can avoid the need to update the dataset with every change in the topology by defining each worker to train on a random shuffle of the entire dataset. While the result of data sampling in this strategy can be quite different than in the case of sharding (e.g. the appearances of a given sample over the course of training will not necessarily be dispersed evenly as in the case of sharding) in many cases it will not impact the ability of the training to converge, though you may want to verify this on your own model.
Host Exclusion Policy
Elastic Horovod includes a relatively strict policy for excluding workers that become unresponsive. (At the time of this writing the Horovod source code refers to this feature as "blacklisting".) This might impose a limitation in a training environment in which hosts might be interrupted only to resume later on with the same IP address or hostname. At the time of this writing, there is an open Pull Request aimed at relaxing this policy, which will hopefully make its way into an upcoming release. In any case, you may need to customize the exclusion policy by overriding the default behavior.
Vulnerability of the Dispatcher
Elastic Horovod will enable you to recover from a failure of any of the host systems with the exception of the host on which the training session is run using the horovodrun command. If this primary machine fails then the entire training session is terminated.
To increase the fault tolerance of your system you can run the horovodrun command on a non-preemptible device, such as an on-demand AWS EC2 instance. To reduce cost you can designate this device as a dedicated dispatcher without any of its own GPU workers. However, there is one potential issue with this approach. Built into the current Horovod code is an assumption that the name of the network interface will be the same across all devices. While this is often indeed the case across identical GPU worker devices, it might not be true on a non-GPU device. We will see an example of this in the next section where we demonstrate Elastic Horovod on EC2 instances.
There is an open feature request to address this issue, so hopefully this will be fixed in a future version. In the meantime you can either try to rename the network interface, or try the following simple workaround (hack): Open _horovod/runner/gloorun.py. In the _launch_glooelastic function create a list of _dispatchernics alongside the existing nics list and pass this list into the subsequent _network.get_driverip call instead of nics as demonstrated in the code block below.
nics = get_common_interfaces(driver)
dispatcher_nics=['ens5']
server_ip = network.get_driver_ip(dispatcher_nics)
Another option to consider is to configure a subset of your GPU devices as on-demand instances. While this solution may be more costly, it has the added advantage of ensuring continuous training even in the event of zero spot instance availability.
Elastic Ray
Ray is a popular framework for building distributed applications with support for launching cloud clusters and automatic cluster scaling. Elastic Horovod includes integration with Ray which can be leveraged to simplify the environment setup. See the documentation for how to easily extend your script to use Ray functionality.
Part 4: Elastic Horovod on Amazon EC2 Spot Instances
In this section we will demonstrate an end to end example of Elastic Horovod on an Amazon EC2 cluster. Special thanks to my colleague Max Rabin for his help with this section.
There are many different ways of configuring and launching cloud clusters, including high level APIs (such as Ray). We chose to demonstrate instance creation using the boto3 Python API, but the example can be easily adapted to other methods.
Step 1: Create EC2 Spot Instances
We start by launching three p2.xlarge spot instance host devices and a single c5.xlarge on-demand instance that will act as our dispatcher. We chose single GPU devices (i.e. slots=1) but in a real world scenario you will often get better performance with fewer hosts and more slots per host (e.g. p2.8xlarge) than with more hosts with fewer slots per host.
In the code block below you can find an example for how to create the three GPU spot instances. To create the dispatcher instance simply reduce the count settings to 1, change the instance type to c5.xlarge, and remove the InstanceMarketOptions settings. Make note of the instance IDs that are returned from the creation call as these can be used to manage the instances from the command line.
import boto3
ec2 = boto3.resource('ec2', region_name="us-east-1")
instances = ec2.create_instances(
MaxCount=3, MinCount=3,
ImageId='ami-072519eedc1730252',#replace with ML AMI of choice
InstanceType='p2.xlarge',
SubnetId='<subnet id>', # <-- fill this in
IamInstanceProfile={'Arn':'<InstanceProfile>'}, <-- fill this in
SecurityGroupIds=['<SecurityGroupIds>'], <-- fill this in
InstanceMarketOptions={
'MarketType': 'spot',
'SpotOptions': {
"SpotInstanceType": "persistent",
"InstanceInterruptionBehavior": "stop"
}
}
)
print(instances)
For our image ID, we chose the latest Ubuntu 18.04 AWS machine learning image. Here’s a snippet of command line code for extracting the latest image.
aws ec2 describe-images --owners amazon
--query 'Images[?Name!=`null`]|[?starts_with(Name,`Deep Learning AMI (Ubuntu 18.04)`) == `true`].[ImageId,Name,CreationDate]'
--output text | sort -k2 | tail
Step 2: SSH setup
Horovod relies on passwordless SSH communication between the dispatcher and all of the workers. To set this up connect to the EC2 instances using one of the documented mechanisms. Create passwordless SSH keys on the dispatcher and copy the public key over to each of the host devices. Refer to the link below for details.
In a real world scenario this step should be automated.
Step 3: As the Ubuntu user, activate the virtual environment and install Horovod
This step needs to be performed on all of the instances.
source activate pytorch_p36
pip install --upgrade pip
HOROVOD_WITH_GLOO=1 pip install --no-cache-dir horovod
Run ifconfig on the dispatcher to view the name of the network interface and update the _horovodrun/runner/gloorun.py file as described in the previous section.
Step 4: Configure discover_hosts.sh script
Copy the discover_hosts.sh we created above onto the dispatcher instance, and modify the hostArray with the IPs of the three host instances.
Make sure that the script is executable and that its directory is in the $PATH environment variable.
Step 5: Perform PyTorch training with horovodrun
Copy the PyTorch script above to each of the host devices, and run the following tests:
Test 1 – Host addition: **** Stop one of the three hosts. This can be done via the Amazon EC2 dashboard or using the AWS CLI:
aws ec2 stop-instances --instance-ids i-<id1>
Run horovodrun on the dispatcher with the two active hosts:
horovodrun -np 2 --min-np 1 --max-np 3 --host-discovery-script discover_hosts.sh --slots 1 python train.py
While the training is running start the third EC2 instance (from the dashboard or the CLI), verify that the training session identifies the addition and adds the third instance as a training host.
aws ec2 start-instances --instance-ids i-<id1>
Test 2 – Host removal: Since we are not able to time a spot termination, we will simulate this by simply stopping one of the hosts. Start the training with all three hosts and after a few training steps stop one of the them, as before. Verify that the removal is identified and that the training session is able to proceed with just the two remaining hosts.
Note that the host removal that we just performed would be considered ungraceful and would result in the failing host being added to the exclusion list. (If you restart the same machine with the same IP, it will not be re-added to the same training session.) In a real world scenario, the host discovery mechanism should be enhanced to ensure that spot interruptions are handled gracefully.
Test 3 – Compare runtime to non-elastic Horovod: You may justifiably want to verify that the added functionality does not come at any extra cost. To do that compare the runtime to a non-elastic run:
horovodrun --gloo
-np 3
-H server1:1,server2:1,server3:1
python train.py
Test 4 – Measure impact of commit frequency: Play with the frequency of the elastic state commitment to measure the impact on the training runtime.
Step 6: Clean up
Don’t forget to delete all EC2 instances at the end of your work.
This can be done from the EC2 dashboard or using the AWS CLI:
aws ec2 terminate-instances --instance-ids i-<id1> i-<id2> ...
Summary
Elastic Horovod is a compelling new feature of Horovod that enables training over a cluster of preemptible instances without the potential overhead of having to restart training from a checkpoint in the event of device interruptions. Such a solution can facilitate significant training cost reductions with minimal impact on the training progression. However, Elastic Horovod is probably not the best solution for every team and every project. Time critical tasks are often better off running on fully reliable on-demand instances, and even for non critical training tasks you should verify that convergence is not impacted by the dynmically changing number of workers. Best of luck!