Making Sense of Big Data

Tensorflow multi-worker training on Google Cloud AI Platform

Szilárd Kálosi
Towards Data Science
8 min readNov 14, 2020

--

Image by author under CC BY-NC 4.0

In nearly every deep learning project, there is a decisive moment when immense volumes of training data or lack of processing power become the limiting factor of completing training in proper time. Therefore, applying appropriate scaling is inevitable. Although scaling up, i.e. upgrading to more powerful hardware, might deliver momentary remedies, seldom does it offer the right scalability because scaling up can rapidly hit its physical limits. Hence, we have no other option than to scale model training out, namely to use additional machines. Blissfully, scaling out in the era of cloud is not a hurdle anymore.

When it comes to scaling out neural network training, there are two main approaches. There is a strategy called model parallelism, in which the neural network itself gets split across multiple devices. This type of distribution gets mainly used in cases where the models consist of a multitude of parameters and would not otherwise fit on particular devices, or the sizes of input samples would impede even calculating activations. For those who are interested in model parallelism for Tensorflow, there is an official solution called Mesh.

Another distribution strategy is called data parallelism, in which the model gets replicated on each node. The data then gets distributed in such a way that each replica sees a different part of it. This distribution strategy becomes particularly useful for cases with vast volumes of training data.

Since each replica receives a distinct slice of the training data at each step, the workers must orderly communicate the gradient updates. There are two ways of communication. There is asynchronous training, in which the workers train their replicas independently. Asynchronous training typically uses discrete parameter servers for reporting into the gradient updates.

The other is synchronous training, in which the workers share the parameter updates after each training step. Synchronous training frequently implements all-reduce algorithms meaning all replicas store the reduced (updated) parameters rather than passing them to a central location.

The main benefit of asynchronous training manifests in avoiding idling for the slowest worker and slightly better machine fault tolerance. The major disadvantage is the parameter server needs a lot of network bandwidth for efficient communication with the workers. Operating more than one parameter servers might solve the issue, but it can also increase training costs.

The tradeoffs of synchronous training are the exact opposite. There is no need for using additional machines for storing the updated model parameters. On the other hand, waiting for the slowest worker can surely stall the process. Choosing workers of similar performance can potentially reduce idling times. Employing an efficient network topology for sharing gradient updates is likewise essential. Redundant communications between workers can eliminate the benefits of synchronous training.

Multi-worker distributed Tensorflow

Tensorflow implements data parallelism strategies in their standard library through the tf.distribute.Strategy API. They support both synchronous and asynchronous multi-worker training, although the latter has only limited support. As of writing this post, the asynchronous ParameterServerStrategy only supports the original tf.estimator.Estimator API. However, starting with version 2.4, there are efforts to make asynchronous training more available.

Since the release of Tensorflow 2.0, tf.keras.Model API has become the primary way of building neural networks, in particular, those not requiring custom training loops. Newly developed distributed training strategies have likewise mostly focused on Keras models. Although there have been several distributed training strategies implemented for Keras models, as of writing this article, the currently available multi-worker training is solely the synchronous MultiWorkerMirroredStrategy.

This strategy does a lot of weight lifting for us. It optimally chooses suitable all-reduce implementation based on the used hardware and network topology. Currently, the available options are the ring all-reduce and NVidia’s NCLL. The strategy also automatically distributes the training dataset among the workers in case of training Keras models. The distribution happens either by sharding the dataset by files or by data. Sharding by files is preferred because each replica loads the assigned files only. It is, for example, available for TFRecord datasets. Storing training examples in them is the most efficient way of processing large dataset efficiently. If, on the other hand, there is no way to shard by files, all workers read all the available data, yet they only process their assigned shards. Sharding by date can happen, for instance, in the case of reading directly from BigQuery, or if there are fewer files than workers. The same sharding holds for validation and test datasets too.

The strategy can also provide fault-tolerance through a Keras callback named BackupAndRestore. This callback backs up the model at the end of each epoch. If a worker interruption occurs, all other workers will also restart, and the training continues from the last finished training epoch. Upon successful model training, the stored checkpoints get deleted.

Multi-worker configuration

The simplest way of configuring multi-worker training in Tensorflow is through an environment variable namedTF_CONFIG. It is a JSON string that contains two keys, cluster and task. The former describes the topology and includes the addresses of the nodes. Its value is identical across all the nodes. The latter is unique for each node and defines their respective roles in the cluster. Once Tensorflow parses the environment variable, it starts gRPC servers based on the configuration.

The following example describes a cluster with two nodes, a chief and a worker.

TF_CONIFG

There is a designated node in this cluster with extra responsibilities, called the chief. It caters to tasks such as saving training checkpoints, writing summaries for TensorBoard and serialising the trained model. If there is no chief explicitly specified in the configuration, the worker with index 0 assumes the role. Of course, the chief node executes the same code as well.

Multi-worker model

Integrating theMultiWorkerMirroredStrategy with Keras models is simple. There are only a few requirements. First, building and compiling the model must take place within the distribution strategy’s scope. Second, the batch size must reflect the number of nodes. Commonly, the batch size gets adjusted by multiplying it with the number of nodes. Keeping the original batch size also works. However, the workers receive smaller batches in that case. These approaches are referred to as weak and strong scaling, respectively.

Multi-worker strategy scope

Another difference opposed to regular Keras training is the requirement of specifying the steps_per_epoch argument, which is otherwise optional. The number of steps per epoch is easily calculated and is the ratio of the number of training instances divided by the adjusted global batch size. The function create_model involves the usual way of defining a model incorporating model compilation as well.

Create model helper method

Saving a multi-worker model is a bit more elaborate. All the nodes must save the model, albeit to different locations. Finally, all the nodes except the chief must delete their saved versions. The reason for these steps is all-reduce communications can still occur.

Multi-worker model saving

Multi-worker training on AI Platform

Google Cloud and Tensorflow work together harmoniously. Tensorflow has always enjoyed a prominent role in AI platform. Furthermore, Google Cloud has also been putting efforts in new products with Tensorflow. For instance, they have recently introduced Tensorflow Enterprise with complementary support and managed services.

As we have seen, havingTF_CONFIG set correctly is an essential ingredient of Tensorflow multi-worker training. For Tensorflow to pick it up, the environment variable must exist before running model training. Fortunately, AI Platform takes care of setting it up based on the given training job configuration.

Ther are two ways of providing the job configuration. It is functional to set the respective options of the submission command. However, for infrequently changed properties, storing them in a YAML config file is more convenient. For successful multi-worker training, the required options are runtimVersion, pythonVersion, scaleTier, masterType, worketType, workerCount.

Training config

The above config file would create a cluster of there nodes consisting of a chief and two workers. The type of machines can be different, although it is more desirable for having them identical across the training cluster. There are predefined machine tiers as well, but they all come with parameter servers. Thus, having scale-tier set to custom is currently imperative.

The configuration also specifies the Tensorflow version, which is equal to the runtimeVersion property. The runtimes are periodically updated, but it takes some time for having later Tensorflow versions supported. It is important to note that runtimes before version 2.1 don’t set the chief node correctly. Hence, multi-worker training on AI Platform is only available for runtimes 2.1 and later.

There are situations, on the other hand, when being able to use not yet supported Tensorflow versions is crucial. For instance, there are relevant new features, or they need proper testing before moving them to production. For such cases, Google AI platform provides an option to use custom containers for training. Submitting custom containers for training is almost similar to using the provided runtimes. There are minor details to consider, though. For having correct TF_CONFIG, containerised Tensorflow models require the option useChiefInTfConfig set totrue. Furthermore, the training configuration must specify the image URIs for both chief and worker. The containers are, of course, identical for both cases.

Custom container config

In both official runtime and custom container, AI Platform Training automatically uses the Cloud ML Service Agent credentials.

Testing locally

Before submitting a job to AI Platform, it is practical to test the correctness of the training configuration locally as failed jobs get billed too. Fortunately, it is remarkably straightforward for official runtimes to run locally, execute

gcloud ai-platform local train --distributed --worker-count $N ...

The command sets up a correct TF_CONFIG for us. Alas, it also adds a parameter server to the configuration. Tensorflow will warn, but having the parameter servers will not thwart the training.

For containers, setting a correct TF_CONFIG and running the containers is a bit more complicated, but not impracticable. The simplest way to achieve local testing is to use docker-compose for running the chief and worker containers as well as setting the environment variables.

Docker-compose configuration for local testing

The COMMAND variable is a string containing all the arguments to the training task. Once the docker-compose configuration is complete, just hit

docker-compose up

In both cases, if the training requires access to other Google Cloud services, such as Cloud Storage, defining GOOGLE_APPLICATION_CREDENTIALS pointing to the right service account key JSON file must take place before running local training.

Next steps

Building neural networks in itself is not a simple task. Dealing with distributed computing on the top of that can end up seriously curbing the process. Fortunately, modern deep learning frameworks are built with scalability in mind and thus enabling faster development. Moreover, thanks to the smooth compatibility between Tensorflow and Google Cloud, exploiting this scalability has never been closer to everyone’s reach.

A simple example

Indeed, this smoothness might not appear so at first glance. For this reason, I have created a simple working example available on Github. Check it out for practical details. Remember, the best way of learning is by doing.

--

--

Mathematician by training. Data scientist by profession. Currently in Berlin.