Effective Load Balancing with Ray on Amazon SageMaker

A method for increasing DNN training efficiency and reducing training costs

Chaim Rand
Towards Data Science

--

Photo by Fineas Anton on Unsplash

In previous posts (e.g., here) we expanded on the importance of profiling and optimizing the performance of your DNN training workloads. Training deep learning models — especially large ones — can be an expensive undertaking. Your ability to maximize the utilization of your training resources in a manner that both accelerates your model convergence and minimizes training costs, can be a decisive factor in the success of your project. Performance optimization is an iterative process in which we identify and address the performance bottlenecks in our application, i.e., the portions in our application that are preventing us from increasing resource utilization and/or accelerating the run time.

This post is the third in a series of posts that focus on one of the more common performance bottlenecks that we encounter when training deep learning models, the data pre-processing bottleneck. A data pre-processing bottleneck occurs when our GPU (or alternative accelerator) — typically the most expensive resource in our training setup — finds itself idle while it waits for data input from overly tasked CPU resources.

An image from the TensorBoard profiler tab demonstrating a typical footprint of a bottleneck on the data input pipeline. We can clearly see long periods of GPU idle time on every seventh training step. (By Author)

In our first post on the topic we discussed and demonstrated different ways of addressing this type of bottleneck, including:

  1. Choosing a training instance with a CPU to GPU compute ratio that is more suited to your workload,
  2. Improving the workload balance between the CPU and GPU by moving some of the CPU operations to the GPU, and
  3. Offloading some of the CPU computation to auxiliary CPU-worker devices.

We demonstrated the third option using the TensorFlow Data Service API, a solution specific to TensorFlow, in which a portion of the input data processing can be offloaded onto other devices using gRPC as the underlying communication protocol.

In our second post, we proposed a more general-purpose gRPC-based solution for using auxiliary CPU workers and demonstrated it on a toy PyTorch model. Although it required a bit more manual coding and tuning than the TensorFlow Data Service API, the solution provided much greater robustness and allowed for the same optimization in training performance.

Load Balancing with Ray

In this post we will demonstrate an additional method for using auxiliary CPU workers that aims to combine the robustness of the general-purpose solution with the simplicity and ease-of-use of the TensorFlow-specific API. The method we will demonstrate will use Ray Datasets from the Ray Data library. By leveraging the full power of Ray’s resource management and distributed scheduling systems, Ray Data is able to run our training data input pipeline in manner that is both scalable and distributed. In particular, we will configure our Ray Dataset in such a way that the library will automatically detect and utilize all of the available CPU resources for pre-processing the training data. We will further wrap our model training loop with a Ray AIR Trainer so as to enable seamless scaling to a multi-GPU setting.

Deploying a Ray Cluster on Amazon SageMaker

A prerequisite for using the Ray framework and the utilities it offers in a multi-node environment is the deployment of a Ray cluster. In general, designing, deploying, managing, and maintaining such a compute cluster can be a daunting task and often requires a dedicated devops engineer (or team of engineers). This can pose an insurmountable obstacle for some development teams. In this post we will demonstrate how to overcome this obstacle using AWS’s managed training service, Amazon SageMaker. In particular, we will create a SageMaker heterogenous cluster with both GPU instances and CPU instances and use it to deploy a Ray cluster at startup. We will then run the Ray AIR training application on this Ray cluster while relying on Ray’s backend to perform effective load balancing across all of the resources in the cluster. When the training application is completed, the Ray cluster will be torn down automatically. Using SageMaker in this manner, enables us to deploy and use a Ray cluster without the overhead that is commonly associated with cluster management.

Ray is a powerful framework that enables a wide range of machine learning workloads. In this post we will demonstrate just a few of its capabilities and APIs using Ray version 2.6.1. This post should not be used as a replacement for the Ray documentation. Be sure to check out the official documentation for the most appropriate and up-to-date use of the Ray utilities.

Before we get started, special thanks to Boruch Chalk for introducing me to the Ray Data library and its unique capabilities.

Toy Example

To facilitate our discussion, we will define and train a simple PyTorch (2.0) Vision Transformer-based classification model that we will train on a synthetic dataset comprised of random images and labels. The Ray AIR documentation includes a wide variety of examples that demonstrate how to build different types of training workloads using Ray AIR. The script we create here loosely follows the steps described in the PyTorch image classifier example.

Defining the Ray Dataset and Preprocessor

The Ray AIR Trainer API distinguishes between the raw dataset and the preprocessing pipeline that is applied to the elements of the dataset before feeding them into the training loop. For our raw Ray dataset we create a simple range of integers of size num_records. Next, we define the Preprocessor that we would like to apply to our dataset. Our Ray Preprocesser contains two components: The first is a BatchMapper that maps the raw integers to random image-label pairs. The second is a TorchVisionPreprocessor that performs a torchvision transform on our random batches which converts them to PyTorch tensors and applies a series of GaussianBlur operations. The GaussianBlur operations are intended to simulate a relatively heavy data pre-processing pipeline. The two Preprocessors are combined using a Chain Preprocessor. The creation of the Ray dataset and Preprocessor is demonstrated in the code block below:

import ray
from typing import Dict, Tuple
import numpy as np
import torchvision.transforms as transforms
from ray.data.preprocessors import Chain, BatchMapper, TorchVisionPreprocessor

def get_ds(batch_size, num_records):
# create a raw Ray tabular dataset
ds = ray.data.range(num_records)

# map an integer to a random image-label pair
def synthetic_ds(batch: Tuple[int]) -> Dict[str, np.ndarray]:
labels = batch['id']
batch_size = len(labels)
images = np.random.randn(batch_size, 224, 224, 3).astype(np.float32)
labels = np.array([label % 1000 for label in labels]).astype(
dtype=np.int64)
return {"image": images, "label": labels}

# the first step of the prepocessor maps batches of ints to
# random image-label pairs
synthetic_data = BatchMapper(synthetic_ds,
batch_size=batch_size,
batch_format="numpy")

# we define a torchvision transform that converts the numpy pairs to
# tensors and then applies a series of gaussian blurs to simulate
# heavy preprocessing
transform = transforms.Compose(
[transforms.ToTensor()] + [transforms.GaussianBlur(11)]*10
)

# the second step of the prepocessor appplies the torchvision tranform
vision_preprocessor = TorchVisionPreprocessor(columns=["image"],
transform=transform)

# combine the preprocessing steps
preprocessor = Chain(synthetic_data, vision_preprocessor)
return ds, preprocessor

Note that the Ray data pipeline will automatically use all of the CPUs that are available in the Ray cluster. This includes the CPU resources that are on the GPU instance as well as the CPU resources of any additional auxiliary instances in the cluster.

Defining the Training Loop

The next step is to define the training sequence that will run on each of the training workers (e.g., GPUs). First we define the model using the popular timm (0.6.13) Python package and wrap it using the train.torch.prepare_model API. Next, we extract the appropriate shard from the dataset and define an iterator that yields data batches with the requested batch size and copies them to the training device. Then comes the training loop itself which is comprised of standard PyTorch code. When we exit the loop, we report back the resultant loss metric. The per-worker training sequence is demonstrated in the code block below:

import time
from ray import train
from ray.air import session
import torch.nn as nn
import torch.optim as optim
from timm.models.vision_transformer import VisionTransformer

# build a ViT model using timm
def build_model():
return VisionTransformer()

# define the training loop per worker
def train_loop_per_worker(config):
# wrap the PyTorch model with a Ray object
model = train.torch.prepare_model(build_model())
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)

# get the appropriate dataset shard
train_dataset_shard = session.get_dataset_shard("train")

# create an iterator that returns batches from the dataset
train_dataset_batches = train_dataset_shard.iter_torch_batches(
batch_size=config["batch_size"],
prefetch_batches=config["prefetch_batches"],
device=train.torch.get_device()
)

t0 = time.perf_counter()

for i, batch in enumerate(train_dataset_batches):
# get the inputs and labels
inputs, labels = batch["image"], batch["label"]

# zero the parameter gradients
optimizer.zero_grad()

# forward + backward + optimize
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

# print statistics
if i % 100 == 99: # print every 100 mini-batches
avg_time = (time.perf_counter()-t0)/100
print(f"Iteration {i+1}: avg time per step {avg_time:.3f}")
t0 = time.perf_counter()

metrics = dict(running_loss=loss.item())
session.report(metrics)

Defining the Ray Torch Trainer

Once we’ve defined our data pipeline and training loop, we can move on to setting up the Ray TorchTrainer. We configure the Trainer in a manner that takes into account the available resources in the cluster. Specifically, we set the number of training workers according to the number of GPUs and we set the batch size according to the memory available on our target GPU. We build our dataset with the number of records required to train for precisely 1000 steps.

from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig

def train_model():
# we will configure the number of workers, the size of our
# dataset, and the size of the data storage according to the
# available resources
num_gpus = int(ray.available_resources().get("GPU", 0))

# set the number of training workers according to the number of GPUs
num_workers = num_gpus if num_gpus > 0 else 1

# we set the batch size based on the GPU memory capacity of the
# Amazon EC2 g5 instance family
batch_size = 64

# create a synthetic dataset with enough data to train for 1000 steps
num_records = batch_size * 1000 * num_workers
ds, preprocessor = get_ds(batch_size, num_records)

ds = preprocessor(ds)
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
train_loop_config={"batch_size": batch_size},
datasets={"train": ds},
scaling_config=ScalingConfig(num_workers=num_workers,
use_gpu=num_gpus > 0),
)
trainer.fit()

Deploy a Ray Cluster and Run the Training Sequence

We now define the entry point of our training script. It is here that we setup the Ray cluster and initiate the training sequence on the head node. We use the Environment class from the sagemaker-training library to discover the instances in the heterogenous SageMaker cluster as described in this tutorial. We define the first node of the GPU instance group as our Ray cluster head node and run the appropriate command on all of the other nodes to connect them to the cluster. (See the Ray documentation for more details on creating clusters.) We program the head node to wait until all the nodes have connected and then start the training sequence. This ensures that Ray will utilize all of the available resources when defining and distributing the underlying Ray tasks.

import time
import subprocess
from sagemaker_training import environment

if __name__ == "__main__":
# use the Environment() class to auto-discover the SageMaker cluster
env = environment.Environment()
if env.current_instance_group == 'gpu' and \
env.current_instance_group_hosts.index(env.current_host) == 0:
# the head node starts a ray cluster
p = subprocess.Popen('ray start --head --port=6379',
shell=True).wait()
ray.init()

# calculate the total number of nodes in the cluster
groups = env.instance_groups_dict.values()
cluster_size = sum(len(v['hosts']) for v in list(groups))

# wait until all SageMaker nodes have connected to the Ray cluster
connected_nodes = 1
while connected_nodes < cluster_size:
time.sleep(1)
resources = ray.available_resources().keys()
connected_nodes = sum(1 for s in list(resources) if 'node' in s)

# call the training sequence
train_model()

# tear down the ray cluster
p = subprocess.Popen("ray down", shell=True).wait()
else:
# worker nodes attach to the head node
head = env.instance_groups_dict['gpu']['hosts'][0]
p = subprocess.Popen(
f"ray start --address='{head}:6379'",
shell=True).wait()

# utility for checking if the cluster is still alive
def is_alive():
from subprocess import Popen
p = Popen('ray status', shell=True)
p.communicate()[0]
return p.returncode


# keep node alive until the process on head node completes
while is_alive() == 0:
time.sleep(10)

Training on an Amazon SageMaker Heterogenous Cluster

With our training script complete, we are now tasked with deploying it to an Amazon SageMaker Heterogenous Cluster. To do this we follow the steps described in this tutorial. We start by creating a source_dir directory into which we place the our train.py script and a requirements.txt file containing the two pip packages our script depends on, timm and ray[air]. These are automatically installed on each of the nodes in the SageMaker cluster. We define two SageMaker Instance Groups, the first with a single ml.g5.xlarge instance (containing 1 GPU and 4 vCPUs), and the second with a single ml.c5.4xlarge instance (containing 16 vCPUs). We then use the SageMaker PyTorch estimator to define and deploy our training job to the cloud.

from sagemaker.pytorch import PyTorch
from sagemaker.instance_group import InstanceGroup
cpu_group = InstanceGroup("cpu", "ml.c5.4xlarge", 1)
gpu_group = InstanceGroup("gpu", "ml.g5.xlarge", 1)

estimator = PyTorch(
entry_point='train.py',
source_dir='./source_dir',
framework_version='2.0.0',
role='<arn role>',
py_version='py310',
job_name='hetero-cluster',
instance_groups=[gpu_group, cpu_group]
)

estimator.fit()

Results

In the table below we compare the runtime results of running our training script in two different settings: a single ml.g5.xlarge GPU instance and a heterogenous cluster containing an ml.g5.xlarge instance and an ml.c5.4xlarge. We evaluate the system resource utilization using Amazon CloudWatch and estimate the training cost using the Amazon SageMaker pricing available as of the time of this writing ($0.816 per hour for the ml.c5.4xlarge instance and $1.408 for the ml.g5.xlarge).

Comparative Performance Results (By Author)

The relatively high CPU utilization combined with the low GPU utilization of the single instance experiment indicates a performance bottleneck in the data pre-processing pipeline. These are clearly addressed when moving to the heterogenous cluster. Not only does the GPU utilization increase, but so does the training speed. Overall, the price efficiency of training increases by 23%.

We should emphasize that these toy experiments were created purely for the purpose of demonstrating the automated load balancing features enabled by the Ray ecosystem. It is possible that tuning of the control parameters may have led to improved performance. It is also likely that choosing a different solution for addressing the CPU bottleneck (such as choosing an instance from the EC2 g5 family with more CPUs) may have resulted in better cost performance.

Summary

In this post we have demonstrated how Ray datasets can be used to balance the load of a heavy data pre-processing pipeline across all of the available CPU workers in the cluster. This enables us to easily address CPU bottlenecks by simply adding auxiliary CPU instances to the training environment. Amazon SageMaker’s heterogenous cluster support is a compelling way to run a Ray training job in the cloud as it handles all facets of the cluster management avoiding the need for dedicated devops support.

Keep in mind that the solution presented here is just one of many different ways of addressing CPU bottlenecks. The best solution for you will highly depend on the details of your project.

As usual, please feel free to reach out with comments, corrections, and questions.

--

--

I am a Machine Learning Algorithm Developer working on Autonomous Vehicle technologies at Mobileye. The views expressed in my posts are my own.