End-to-end Machine Learning with TFX on TensorFlow 2.x
I was pretty excited when Tensorflow 2.0 was released late last year. After all, it promised a more enjoyable developer experience over its 1.x counterpart (otherwise known as the deep learning equivalent of Java from hell). Keras layers with strong support for deploying models? Count me in!
My excitement was short-lived, however, when I realized that support for taking TF 2.0 models to production was not what I imagined it to be. Sure, TensorFlow Extended was an option, but it wasn’t a particularly pleasant one considering that the documentation had yet to catch up.
Still, I was impatient; and, eager to execute. What ensued was an arduous journey filled with tears and cheers, all in search of the one true prize that is a working machine learning pipeline. I present to you my r̶a̶n̶t̶s̶ findings in the hope that you will not have to go through what I did. Well, until the next release anyway.
What this is (not)
This article is in no way an authoritative piece on the way to build production-ready TF 2.0 pipelines. It illustrates one possible workflow for deploying ML models that accounts for memory constraints and training-serving skew (amongst other things). If these concepts sound foreign, I recommend reading Google’s Rules of Machine Learning. Prior experience with neural networks and the machine learning lifecycle would certainly be helpful.
You will learn how to:
- Use TF Transform to perform feature imputation and scaling
- Build models using the Keras functional API and feature columns
- Export a model for TF Serving that reuses the Transform graph
To keep things short, I will only be showing code snippets.
1. Install dependencies
You will need the following Python packages. Since we will be using Apache Beam to run our TF Transform pipelines, let’s install that too.
pip install apache-beam==2.16.0 tensorflow==2.0.0 tensorflow-transform==0.15.0
2. Preprocess data using TF Transform
This guide assumes familiarity with TF Transform, which involves writing transformations in a preprocessing_fn
to be executed by a Beam pipeline. Further information on how to get started can be found here.
Let’s write a simple transformation for a numerical variable age
that imputes missing values with the mean and applies feature scaling.
import tensorflow as tf
import tensorflow_transform as tftdef preprocessing_fn(inputs):
outputs = inputs.copy()
age = outputs["age"]
mean_age = tft_mean(age)
age = impute(age, -1, mean_age)
outputs["age"] = tft.scale_to_z_score(age)
return outputs
Hold up! Doesn’t TF Transform support calculation of means using thetft.mean()
Analyzer? Why do we need to write our own implementation of tft_mean
? This is because TF Transform has a known issue where NaNs may produce unexpected results for some Analyzers. To address this, let’s only calculate the mean using non-null values.
def tft_mean(tensor):
finite_indices = tf.math.is_finite(tensor.values)
finite_values = tf.boolean_mask(tensor.values, finite_indices)
return tft.mean(finite_values)
Now, let’s impute missing values using the mean age. There are two ways of representing missing values: first, by exclusion through the use of a SparseTensor; second, by using some arbitrary value (e.g. -1
for numerical variables or NULL
for categorical variables). Let’s account for both scenarios.
def impute(tensor, missing, replacement):
sparse = tf.sparse.SparseTensor(
tensor.indices, tensor.values, [tf.shape(tensor)[0], 1]
)
dense = tf.sparse.to_dense(sp_input=sparse, default_value=replacement)
return tf.where(tf.equal(tensor, missing), replacement, dense)
You can then run your TF Transform pipeline, don’t forget to export the Transform graph so that we can reuse it later!
import apache_beam as beam
import tensorflow_transform.beam as tft_beam
import tempfilewith beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
# read raw data
raw_data = pipeline >> beam.io.ReadFromTFRecord(...)
# apply transformation
transformed_data, transform_fn = (
(raw_data, raw_metadata) >> tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)
)
# export transform graph
_ = (
transform_fn >> tft_beam.WriteTransformFn("data/tft/")
)
3. Build a model using Keras layers and feature columns
It’s time for some feature engineering. TensorFlow feature columns provide a concise API for common operations such as one-hot encoding. Let’s define feature columns for a numerical variable age
and a categorical variablecountry
.
features = [
tf.feature_column.numeric_column("age"),
tf.feature_column.indicator_column(
tf.feature_column.categorical_column_with_vocabulary_list(
"country", ["America", "Japan", "China"],
)
),
]
Using feature columns with the Keras functional API can be a little tricky. You will need to instantiate a Keras tensor using an Input layer.
from tensorflow.keras.layers import Inputfeature_inputs = {
"age": Input(name="age", shape=(), dtype=tf.float32),
"country": Input(name="country", shape=(), dtype=tf.string),
}
We can now feed this into a Keras DenseFeatures layer and proceed to define our neural network architecture.
from tensorflow.keras.layers import Dense, DenseFeatures
from tensorflow.keras import Modeloutput_1 = DenseFeatures(features)(feature_inputs)
output_2 = Dense(16, activation="relu")(output_1)
preds = Dense(10, activation="softmax")(output_2)
model = Model(inputs=feature_inputs, outputs=preds)model = model.compile(...)
model.fit(...)
4. Export model and call Transform graph
Congratulations on making it this far! After training our model, we will export our model as a SavedModel to deploy it. To prevent skew between training and serving, we’ll need to load the exported Transform graph to run the same transformations on serving inputs.
tft_dir = "data/tft/" # location of exported transform_fntft_output = tft.TFTransformOutput(tft_dir)
tft_output.transform_raw_features(raw_serving_inputs)
In Tensorflow 1.x, you would include the above logic in the serving_input_receiver_fn
when exporting your model. However, as we move away from the Estimator API in TF 2.x, this functionality is being deprecated. Instead, we will need to modify the serving signature of the Keras model by overriding the save()
method.
class ExportModel(tf.keras.Model):
def __init__(self, model, tft_dir):
super().__init__(self)
self.model = model
self.tft_output = tft.TFTransformOutput(tft_dir)
@tf.function(input_signature=[SERVING_FEATURE_SPEC])
def serving_fn(self, inputs):
transformed = self.tft_output.transform_raw_features(inputs)
return {"preds": self.model(transformed)} def save(self, output_dir):
signatures = {"serving_default": self.serving_fn}
tf.saved_model.save(self, output_dir, signatures)ExportModel(model, "data/tft/").save("data/model/1/")
We can then serve our model using TF Serving.
Hope this helps. Ciao!