End-to-End Databricks-Horovod Use Case
When starting a big machine learning project, there are many platforms to choose from. This article will give a guide to training neural networks using Tensorflow & Horovod on Databricks.
Background
At SureWx (pronounced "Sure Weather"), we have developed custom weather forecasts for aviation applications. This fits into our suite of data driven products for airlines that assist with operations from gate to take off. In the winter, operational decisions become complex since precipitation requires that additional cleaning and safety measures take place before an aircraft can leave the ground. More accurate weather predictions can help make sure aircrafts depart on schedule. That’s why we’ve been training neural networks using data from our network of sensors around the world combined with radar data to predict short term weather. This article contains the most important discoveries relating to the architecture and training of our models that we learned along the way.
1. Save Data in TFRecord Batches
Data is read from your S3 bucket which is mounted to /dbfs on your spark cluster. As long as your EC2 instances are in the same region as your S3 bucket, you only pay for GET requests to S3 (but not for data transfer). GET requests can add up quickly if you data isn’t batched.
Unbatched Example:
$0.0004 / 1000 requests 200,000 files/epoch 1000 epochs = $80
$80 of S3 costs per model is a significant unexpected cost and can even be more than the GPU costs in some cases. If you have already built a tf.data.dataset for training, it is easy to prebatch the data by writing them to TFRecords. You can find a guide here to writing and parsing TFRecords:
2. Interleave while Parsing
When you’re parsing, make sure to use interleaving with parallelization for maximum performance. Also, in order to have efficient scale up of the training when adding more nodes, the tf.data.Dataset needs to have sharding applied before the file is parsed. That way each shard only reads a fraction of the files into memory.
import glob
import tensorflow as tf
def parse_fn(serialized):
features =
{
'radar': tf.io.FixedLenFeature([], tf.string),
'stationd_data': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.string)
}
# Parse the serialized data so we get a dict with our data.
parsed_example = tf.io.parse_single_example(serialized=serialized, features=features)
# Get the image as raw bytes.
radar_raw = parsed_example['radar']
station_data_raw = parsed_example['station_data']
label_raw = parsed_example['label']
# Decode the raw bytes so it becomes a tensor with type.
radar = tf.io.decode_raw(radar_raw, tf.float32)
radar = tf.reshape(radar, radar_shape)
station_data = tf.io.decode_raw(station_data_raw, tf.float64)
station_data = tf.reshape(station_data, station_data_shape)
label = tf.io.decode_raw(label_raw, tf.float32)
label = tf.reshape(label, label_shape)
return (radar, station_data), label
files = glob.glob(f'{data_path}/*.tfrecord')
files_ds = tf.data.Dataset.from_tensor_slices(files)
files_ds = files_ds.shard(self.shards, shard_index)
ds = files_ds.interleave(lambda x: tf.data.TFRecordDataset(x)).map(lambda x: parse_fn(x), num_parallel_calls = tf.data.experimental.AUTOTUNE)
3. Choose S3 paths to optimize S3’s sharding
Another reason to save data in batches is to avoid S3 throttling and network errors. You are limited to 5500 requests per second per prefix in S3. This may seem like a lot – but if you are training many models at the same time you can hit this limit. Its good practice to save the prebatched TFRecords with filepaths that optimize S3’s sharding. See the guide below.
Understand S3 Object Key Naming Patterns for High Request Rates
4. Error handling
Training can be interrupted for a variety of reasons: bad data, instabilities in the training, errors in callbacks, spot instances being taken away… In order not to waste resources you want your model to checkpoint and recover.
keras.callbacks.ModelCheckpoint(f'{checkpoint_path}_{time}', save_weights_only = False, monitor='val_loss', mode='min', save_best_only=True))
keras.callbacks.ModelCheckpoint(checkpoint_path, save_weights_only = False, save_best_only=False, save_freq='epoch', period=3))
The first callback is to save the best model that comes out the the training session. The second callback is to save the model every few epochs to that if training is interrupted, a new job can be started and the model loaded. Without both callbacks, you would throw away all the training since the last best model. It’s easy to have two callbacks, why waste GPU time?
5. Multi Input Mixed Data
ETL which handles mixed data (ex: images and time series data) can be tricky to design. In our case we combine sequences of radar images with time series data from our sensors. We use Spark to build a dataframe which aligns our images with the sensor data. For example, say we would like to match an image with the last 30 minutes of sensor data. We apply a window function on the time series data which saves all the time series data into a single byte array.
import pandas as pd
import numpy as np
from pyspark.sql.types import BinaryType
from pyspark.sql.functions import pandas_udf
df # sensor time series data.
# Stack the sensor data columns together and serialize
@pandas_udf(BinaryType())
def serialize_udf(s1: pd.Series, s2: pd.Series) -> bytearray:
"""Takes multiple pandas series, stacks them, and converts the resulting array to bytes"""
data = np.dstack((s1.to_numpy(), s2.to_numpy()))
data = data.astype(np.double)
data = data[0,:,:]
return data.tobytes()
w = Window.orderBy(f.col("time").cast('long')).rangeBetween(-30*60, 0)
df = df.withColumn('sensor_data', serialize_udf(*config['sensor_cols']).over(w))
pdf = df.toPandas()
The timeseries data will then need a parser to be applied before being read in tf.data.dataset.
sensor_data_bytes_list = list(pdf['sensor_data'].to_numpy())
sensor_data_list = list(map(lambda blob: np.frombuffer(blob, np.double), sensor_data_bytes_list))
sensor_data_array = np.concatenate(sensor_data_list)
ds = tf.data.Dataset.from_tensor_slices(sensor_data_array)
Gotchas
- Running Horovod on Spark via HorovodRunner, you pay for 1 GPU instance that you can’t use for computation – This is the overhead cost of the spark driver which Horovod cannot use for training.
- Tensorboard profiler extension isn’t available on Databricks – If is your first time designing a tensorflow data pipeline, you would be best to optimize it on a single GPU server first.
- Horovod on Databricks is not elastic – training stops when spot instances are taken away. If you’re looking to run on spot pricing, you should expect a lot of interruptions to training.
- Acquiring g4dn EC2 spot instances is quite difficult (as of 2021). Databricks cannot run on g4da instances and the other EC2 GPU instances aren’t as good value.
Conclusion
Databricks has created a nice suite to have data, ETL and distributed training all in the same place. I hope these tips make your development easier! Good luck.