End-to-end Machine Learning with TFX on TensorFlow 2.x

Andre Tan
Towards Data Science
4 min readFeb 1, 2020

--

I was pretty excited when Tensorflow 2.0 was released late last year. After all, it promised a more enjoyable developer experience over its 1.x counterpart (otherwise known as the deep learning equivalent of Java from hell). Keras layers with strong support for deploying models? Count me in!

My excitement was short-lived, however, when I realized that support for taking TF 2.0 models to production was not what I imagined it to be. Sure, TensorFlow Extended was an option, but it wasn’t a particularly pleasant one considering that the documentation had yet to catch up.

Still, I was impatient; and, eager to execute. What ensued was an arduous journey filled with tears and cheers, all in search of the one true prize that is a working machine learning pipeline. I present to you my r̶a̶n̶t̶s̶ findings in the hope that you will not have to go through what I did. Well, until the next release anyway.

What this is (not)

This article is in no way an authoritative piece on the way to build production-ready TF 2.0 pipelines. It illustrates one possible workflow for deploying ML models that accounts for memory constraints and training-serving skew (amongst other things). If these concepts sound foreign, I recommend reading Google’s Rules of Machine Learning. Prior experience with neural networks and the machine learning lifecycle would certainly be helpful.

An example of a machine learning lifecycle adopted at Gojek

You will learn how to:

To keep things short, I will only be showing code snippets.

1. Install dependencies

You will need the following Python packages. Since we will be using Apache Beam to run our TF Transform pipelines, let’s install that too.

pip install apache-beam==2.16.0 tensorflow==2.0.0 tensorflow-transform==0.15.0

2. Preprocess data using TF Transform

This guide assumes familiarity with TF Transform, which involves writing transformations in a preprocessing_fn to be executed by a Beam pipeline. Further information on how to get started can be found here.

Let’s write a simple transformation for a numerical variable age that imputes missing values with the mean and applies feature scaling.

import tensorflow as tf
import tensorflow_transform as tft
def preprocessing_fn(inputs):
outputs = inputs.copy()

age = outputs["age"]
mean_age = tft_mean(age)
age = impute(age, -1, mean_age)
outputs["age"] = tft.scale_to_z_score(age)

return outputs

Hold up! Doesn’t TF Transform support calculation of means using thetft.mean() Analyzer? Why do we need to write our own implementation of tft_mean? This is because TF Transform has a known issue where NaNs may produce unexpected results for some Analyzers. To address this, let’s only calculate the mean using non-null values.

def tft_mean(tensor):
finite_indices = tf.math.is_finite(tensor.values)
finite_values = tf.boolean_mask(tensor.values, finite_indices)
return tft.mean(finite_values)

Now, let’s impute missing values using the mean age. There are two ways of representing missing values: first, by exclusion through the use of a SparseTensor; second, by using some arbitrary value (e.g. -1 for numerical variables or NULL for categorical variables). Let’s account for both scenarios.

def impute(tensor, missing, replacement):
sparse = tf.sparse.SparseTensor(
tensor.indices, tensor.values, [tf.shape(tensor)[0], 1]
)
dense = tf.sparse.to_dense(sp_input=sparse, default_value=replacement)
return tf.where(tf.equal(tensor, missing), replacement, dense)

You can then run your TF Transform pipeline, don’t forget to export the Transform graph so that we can reuse it later!

import apache_beam as beam 
import tensorflow_transform.beam as tft_beam
import tempfile
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
# read raw data
raw_data = pipeline >> beam.io.ReadFromTFRecord(...)

# apply transformation
transformed_data, transform_fn = (
(raw_data, raw_metadata) >> tft_beam.AnalyzeAndTransformDataset(preprocessing_fn)
)

# export transform graph
_ = (
transform_fn >> tft_beam.WriteTransformFn("data/tft/")
)

3. Build a model using Keras layers and feature columns

It’s time for some feature engineering. TensorFlow feature columns provide a concise API for common operations such as one-hot encoding. Let’s define feature columns for a numerical variable age and a categorical variablecountry.

features = [
tf.feature_column.numeric_column("age"),
tf.feature_column.indicator_column(
tf.feature_column.categorical_column_with_vocabulary_list(
"country", ["America", "Japan", "China"],
)
),
]

Using feature columns with the Keras functional API can be a little tricky. You will need to instantiate a Keras tensor using an Input layer.

from tensorflow.keras.layers import Inputfeature_inputs = {
"age": Input(name="age", shape=(), dtype=tf.float32),
"country": Input(name="country", shape=(), dtype=tf.string),
}

We can now feed this into a Keras DenseFeatures layer and proceed to define our neural network architecture.

from tensorflow.keras.layers import Dense, DenseFeatures
from tensorflow.keras import Model
output_1 = DenseFeatures(features)(feature_inputs)
output_2 = Dense(16, activation="relu")(output_1)
preds = Dense(10, activation="softmax")(output_2)
model = Model(inputs=feature_inputs, outputs=preds)
model = model.compile(...)
model.fit(...)

4. Export model and call Transform graph

Congratulations on making it this far! After training our model, we will export our model as a SavedModel to deploy it. To prevent skew between training and serving, we’ll need to load the exported Transform graph to run the same transformations on serving inputs.

tft_dir = "data/tft/"  # location of exported transform_fntft_output = tft.TFTransformOutput(tft_dir)
tft_output.transform_raw_features(raw_serving_inputs)

In Tensorflow 1.x, you would include the above logic in the serving_input_receiver_fn when exporting your model. However, as we move away from the Estimator API in TF 2.x, this functionality is being deprecated. Instead, we will need to modify the serving signature of the Keras model by overriding the save() method.

class ExportModel(tf.keras.Model):
def __init__(self, model, tft_dir):
super().__init__(self)
self.model = model
self.tft_output = tft.TFTransformOutput(tft_dir)

@tf.function(input_signature=[SERVING_FEATURE_SPEC])
def serving_fn(self, inputs):
transformed = self.tft_output.transform_raw_features(inputs)
return {"preds": self.model(transformed)}
def save(self, output_dir):
signatures = {"serving_default": self.serving_fn}
tf.saved_model.save(self, output_dir, signatures)
ExportModel(model, "data/tft/").save("data/model/1/")

We can then serve our model using TF Serving.

Hope this helps. Ciao!

--

--