Hands-on Tutorials

Adapting to changes of data by building MLOps pipeline in Vertex AI

This post shows how to build a machine learning pipeline for object detection tasks. The objective is to demonstrate how to realize MLOps to prepare data drift situations using Vertex AI’s AutoML and Cloud Function.

Park Chansung
Towards Data Science
13 min readAug 11, 2021

--

Motivation

I have a three year old daughter, and I realized that her face is continuously changing so dramatically month by month. Google photo is already doing an excellent job to recognize faces by tracking changes in people’s faces. However, I figured that I could go even further like classifying pictures of her not wearing masks with layered predictions later. That would be a topic for the next article.

Also, I thought this is a great example for MLOps because my wife and I have taken lots of pictures of her just like usual parents. That means I have enough data to build a model, and the data already contains potential data drift since I have pictures of her in the age of infant, one, two, etc.

Building Pipeline and Trigger

We need Vertex AI, Google Cloud Storage, and Cloud Function to build a machine learning pipeline and a trigger. Vertex AI is not a single service, but a combination of lots of different AI related services. To be specific, the project for this post leverages Vertex AI’s Dataset, AutoML, Notebook, Pipeline, Model, and Endpoint features altogether.

Figure 1 shows a step by step instruction on how to build an entire pipeline. The best way to understand the blueprint of a machine learning system is to think about components consisting of the workflow. Let’s go through one by one. Just keep in mind that each component does not represent a standalone job but they are connected altogether in order.

Figure 1: Setup for the pipeline — Image by Author

Data Preparation

Data is the heart of machine learning, and we can’t do anything without it. So, there is no doubt that we need to prepare the dataset first. Vertex AI provides a managed dataset feature. You can simply import data from local storage, or you can just point to GCS location if you already have the dataset in an existing GCS bucket.

However, as you know, data itself is not enough. We need labels/annotations. Vertex AI Dataset lets you import labels directly at the time of importing the raw data. You just need to make sure the labels are formed in a suggested manner. You can find how to make your own labeling file here according to data types.

In order to give you a sense how it looks like, the below shows an example with a label for image classification task in CSV format(you can use JSONL as well).

[ML_USE], GCS_FILE_PATH,[LABEL]

and you can find another example for object detection in CSV format.

[ML_USE],GCS_FILE_PATH,[LABEL],[BOUNDING_BOX]*

Here, you can simply ignore ML_USE, but if you want to manually split data into training/validation/test sets, it can be either of training, validation, or test. The BOUNDING_BOX is a list of 8 values, and each two is a pair. So you can guess it represents coordinates of each edge of a bounding box. Basically, the order has to follow X_MIN, Y_MIN, X_MAX, Y_MIN, X_MAX, Y_MAX, X_MIN, Y_MAX.

Training

There are multiple options for training a model against your dataset. This post specifically shows how to leverage Vertex AI AutoML features to realize MLOps. Here are three reasons why I have chosen AutoML. First, I do not have to care about modeling. All I have to do is to prepare the dataset in the right format to be recognized by AutoML. Fortunately Vertex AI Dataset perfectly matches to Vertex AI AutoML, so there is no additional workload for this.

Second, when the dataset evolves, we can not guarantee that the current state-of-the-art model is good enough. We probably have to run multiple experiments by writing different versions of codes for data engineering, modeling, and hyper-parameter tuning. AutoML mostly shows the top notch results. That is because the internal algorithm is likely modified and maintained by Google engineers over time, and it potentially guarantees that we almost always leverage the reliable state-of-the-art AutoML modeling techniques.

Last but not least, It is simple to integrate Vertex AI AutoML into Vertex AI Pipeline. Vertex AI Pipeline is simply a wrapper service for Kubeflow Pipeline, and Google has defined a bunch Kubeflow components that are smoothly fused into the standard Kubeflow Pipeline. This means you can leverage Vertex AI AutoML while writing standard Python codes for custom components and connect them.

Deployment and Monitoring

Deployment can be seen as one operation combined with two separate operations, model export and serving an endpoint. Vertex AI supports both of them via Vertex AI Model and Endpoint. Vertex AI Model is a central place where all the trained models are managed along with their versions. You can view the trained results with metrics, test out a simple prediction with it.

Once you think you are ready to deploy the model for the real world users, you can create an endpoint with a chosen model. Actually, Vertex AI Endpoint manages the endpoint of the model in Google Kubernetes Engine. That means you don’t have to care about the scalability of the model. Only a couple of nodes can be served during the early stage of your business, but the number can grow smoothly when your business becomes too big to handle user requests with only a couple of nodes.

Figure 2: Model monitoring capability in Vertex AI Endpoints — Image by Author

Vertex AI Endpoint also provides monitoring functionality for predictions/second, requests/second, latency, and prediction error percentage. You need extra effort to handle concept/data drift issues, but it is sufficient to see if there are errors in prediction requests, if the prediction delays more than expected, if the throughput is not enough with Vertex AI. Vertex AI provides additional monitoring features for tabular model and custom trained model to inspect the model behaviour in depth for now, but it is likely that AutoML models are going to be supported in near future.

Pipeline and Trigger

You can do dataset creation, model training, endpoint instantiation, model deployment individually. However, it is much better to construct a pipeline doing all these jobs in a consistent manner. AutoML likely guarantees for you to have the best model. That means all we have to do is to prepare more data and trigger the pipeline when you witness model performance degradation.

Then what can trigger the pipeline run to learn a new dataset? Certainly, there should be an event listening system to detect the changes in the dataset. This is where Cloud Function comes in. Cloud Function can listen to modification events whenever there are changes to a designated GCS bucket. With this ability, we can simply run the pipeline when we record more data.

Initial Operational Workflow

As an initial phase to demonstrate MLOps, we need a base dataset. As shown in Figure 3, there are multiple steps to create a dataset. First, you need to choose a data type and task type. For this project, I have chosen “Image object detection” under the “IMAGE” category. Second, you can upload images from the local filesystem, or if you have images already been uploaded to a GCS bucket, you can simply choose it.

Figure 3: Creating Vertex AI Dataset — Image by Author

Also, if you have an extra label file, you can upload it from the same UI as in Figure 3. For this project, I didn’t have any labels, so I just clicked “CONTINUE” after uploading a bunch of images.

Figure 4: Labeling capability in Vertex AI Dataset — Image by Author

Luckily, Vertex AI Dataset provides nice labeling tools right in your browser as shown in Figure 4. With this functionality, I have labeled about 100 images by simply drag and drop the mouse position. After doing this, you have a complete dataset meaning the data and associated labels are stored in the GCS bucket.

Now we are ready to build our initial pipeline with the dataset. We can write pipelining codes directly in terminal or favorite IDEs, but it is often a good idea to run the initial pipeline within a Jupyter Notebook. Because it provides a nice interactive environment, we can edit and experiment by going back and forth of the code. Furthermore, Vertex AI Notebook lets you ignore all the hassles about authorizing process for the GCP since it is already running in the GCP environment.

Figure 5: Initial phase to construct and run a pipeline in Vertex AI Pipeline — Image by Author

Figure 5 shows how the workflow goes within a notebook for the initial pipeline run. As the first step, we need to import necessary libraries and set some required variables as shown in the code below.

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from kfp.dsl import pipeline
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
PROJECT_ID = “YOUR_GCP_PROJECT_ID”
REGION = “GCP_REGION_TO_RUN_PIPELINE”
PIPELINE_ROOT = “LOCATION_RUN_METADATA_IS_GOING_TO_BE_STORED”
DATASET_META_PATH = “LOCATION_DATASET_METADATA_IS_STORED”

We need three five libraries under the three package. The two blocks of code examples below show how they are used. The first code block shows how to define a pipeline with three pipeline components in it. Note that @component decorator is used to indicate the function pipeline is where the entire pipeline is defined to Kubeflow Pipeline. We can separate the three components into individual functions with @component decorator and hook them in the pipeline. However, I have put everything in one place to make this sample as simple as possible.

ImageDatasetCreateOp component is to import dataset which we have defined via Vertex AI Dataset. In order to instantiate this component, we need to tell three things which are GCP Project ID, GCS path where the label file is stored, and task type.

@pipeline(name=”my-pipeline”)
def pipeline(project: str = PROJECT_ID):
ds_op = gcc_aip.ImageDatasetCreateOp(
project=project,
display_name=”DATASET_NAME_TO_APPEAR”,
gcs_source=DATASET_META_PATH,
import_schema_uri=\
aiplatform.schema.dataset.ioformat.image.bounding_box,
)
training_job_run_op = gcc_aip.AutoMLImageTrainingJobRunOp(
project=project,
display_name=”my-daughter-od-training”,
prediction_type=”object_detection”,
model_type=”CLOUD”,
base_model=None,
dataset=ds_op.outputs[“dataset”],
model_display_name=”my-daughter-od-model”,
training_fraction_split=0.6,
validation_fraction_split=0.2,
test_fraction_split=0.2,
budget_milli_node_hours=20000,
)
endpoint_op = gcc_aip.ModelDeployOp(
project=project, model=training_job_run_op.outputs[“model”]
)

The next component is AutoMLImageTrainingJobRunOp. This is a unified component for every image related task. As you can see, you can specify the specific task type in the prediction_type argument. Also note model_type is set to “CLOUD”. This tells AutoML to figure out what kind of resulting model to produce. For example, you can set model_type differently to “CLOUD_LOW_LATENCY_1” if you want to get a lighter model with low latency. There are multiple choices, so please check out the API document for further information. For this project I just left it as the standard average model.

There are three more arguments to consider in the AutoMLImageTrainingJobRunOp component. You can specify training/validation/test split ratio directly. Although you can specify which images should belong to which dataset during the dataset preparation stage, if you explicitly set them within this component, it will ignore the information and randomly assign data according to the ratios. If you can’t decide carefully the splits by yourself, this is a good way to get started. The buget_milli_node_hours is a constraint when to stop training. Because AutoML can grow the size of a model infinitely if you train forever, you have to make a decision when to stop the training process. Otherwise, you are going to pay lots of money without any much accuracy gain. Lastly, which dataset the AutoML is going to be trained on has to be told, and that is done with the dataset argument. One important thing to know is that the dataset argument sets a connection and a dependency between ImageDatasetCreateOp and AutoMLImageTrainingJobRunOp since the training job has to be executed after the dataset creation operation.

The last component is ModelDeployOp. Despite the name, it has abilities to create an endpoint and deploy the trained model to the endpoint. We can explicitly do the two operations separately, but it is convenient to do them all with one component. All you need to do is specify what model to deploy, and that is set via model argument. And again, this argument sets a connection and a dependency between AutoMLImageTrainingJobRunOp and ModelDeployOp.

compiler.Compiler().compile(
pipeline_func=pipeline, package_path=PIPELINE_SPEC_PATH
)
api_client = AIPlatformClient(project_id=PROJECT_ID, region=REGION)response = api_client.create_run_from_job_spec(
PIPELINE_SPEC_PATH,
pipeline_root=PIPELINE_ROOT,
parameter_values={“project”: PROJECT_ID},
)

The pipeline is fully defined, then we have to compile it with the compiler.Compiler().compile method. The role of compiling is to construct a pipeline specification by looking up the pipeline function definition. In the pipeline specification, lots of hidden pieces are recorded such as the job dependencies, which cloud machine type and which container image to be used, and more. By specifying package_path argument, the compiler outputs the pipeline specification in a JSON file. With the JSON file, all you need to do to run the pipeline is to pass the JSON file to the Vertex AI Client’s create_run_from_job_spec method. It is very important for automatic pipeline triggering and reusability.

After sending a request to Vertex AI with the JSON file, you will have the initial pipeline running in the Vertex AI Pipeline. When the pipeline run is finished, you can find out the trained model, the endpoint, the model which is deployed to the endpoint through Vertex AI Model, Endpoint UI Panels respectively.

Observing Unexpected Data Drift

It is trivial to build a simple application to send an image to the endpoint for prediction, but you can easily test your model with testing functionality in Vertex AI Model. I have used that feature since I can easily see the predicted result visually on the GCP console.

In order to test that the model should not be able to detect my daughter’s faces at the age of three as expected, I have given some images to the model as shown in Figure 6. However, I got unexpected results that the model has successfully detected her grown up faces.

Figure 6: Unexpected result (1). The trained model recognizes her face at the age of 3 even though it was trained on the pictures of infants — Image by Author

I have continued testing the model with the recent images, and I have realized that the COVID-19 pandemic had happened in the last year. I found lots of her images wearing masks, and there were a few images wearing sunglasses. This was the unexpected situation when I initially brainstormed the idea for this project, and I soon saw this situation as the data drift problem. As you can see in Figure 7, the trained model has captured the wrong location and different kids’ faces.

Figure 7: Unexpected result (2). There were lots of pictures that I had not expected — Image by Author

It is possible to run the pipeline manually after collecting and labeling new data. However, it would be much nicer to implement an automatic system to trigger pipeline run whenever we have new data because we have created the JSON specification file.

Figure 8 shows the workflow when data drift occurs. As you can see, we don’t need Vertex AI Notebook anymore, and the pipeline run is not executed directly from the notebook. Instead we can make and deploy a small function to be triggered on Cloud Function, and it listens to an event of changes in a designated GCS bucket. One thing to note is that there is a separate GCS bucket for storing the metadata of the final dataset. While labeling your dataset one by one, the metadata keeps changing frequently, and you don’t want the pipeline to be run every time when it changes. Instead, we can export the finalized metadata file into a separate GCS bucket when we think we are done.

Figure 8: Operational workflow when data drift occurs — Image by Author

The code block below shows all the codes for the Cloud Function. It is so simple since we already have the pipeline specification JSON file. vertex_ai_pipeline_trigger function gets called whenever there are any changes to any files belonging to a designated GCS bucket. Hence, we need to write a simple filtering conditional statement. The below code ensures to run the pipeline when there are any changes to a file with jsonl extension which is supported by Vertex AI Dataset when exporting.

from kfp.v2.google.client import AIPlatformClientPROJECT_ID = “YOUR_GCP_PROJECT_ID”
REGION = “GCP_REGION_TO_RUN_PIPELINE”
PIPELINE_ROOT = “LOCATION_RUN_METADATA_IS_GOING_TO_BE_STORED”
PIPELINE_SPEC_PATH = “LOCATION_PIPELINE_SPEC_IS_STORED”
def vertex_ai_pipeline_trigger(event, context):
print(‘File: {}’.format(event[‘name’]))
print(‘Extension: {}’.format(event[‘name’].split(“.”)[-1]))
if event[‘name’].split(“.”)[-1] == “jsonl”:
print(“target file extension”)
api_client = AIPlatformClient(
project_id=PROJECT_ID,
region=REGION
)
print(“api_client is successfully instantiated”)
response = api_client.create_run_from_job_spec(
PIPELINE_SPEC_PATH,
pipeline_root=PIPELINE_ROOT,
parameter_values={“project”: PROJECT_ID},
)
print(‘response: {}’.format(response))

Inside the conditional statement, the code to run the pipeline is exactly the same to what we have seen from the notebook section. You might wonder if we need extra authentication processes to access Vertex AI from other GCP services. However, it can be streamlined because Cloud Function and Vertex AI are both GCP services.

gcloud functions deploy YOUR_FUNCTION_NAME \
— trigger-resource YOUR_TRIGGER_BUCKET_NAME \
— trigger-event providers/cloud.storage/eventTypes/object.finzlize

After writing a python file containing the code block above, we can deploy it to Cloud Function with the shell command below. This command should be run in the same directory where the python file is located, and “YOUR_FUNCTION_NAME” should match to the actual function name defined in the python file. Please find more about this command in the official document. Also, make sure to include requirements.txt in the same directory with any necessary libraries in it. In this project, google-cloud-aiplatform was included to access Vertex AI APIs.

Updating Dataset to Complement

By writing and deploying the Cloud Function, collecting and labeling more data, and exporting the metadata to the appropriate GCS bucket are everything to get a new model for the updated dataset.

Figure 9: Collecting and labeling more data to cover mask wearing situations — Image by Author

Figure 9 shows that I have included more of my daughter’s images wearing masks. I have also included more of her photos recently taken so that the model could recognize her better.

Final Result

When clicking the exporting metadata button on the top right corner, the Cloud Function gets triggered, and it triggers the pipeline automatically.

Figure 10: After the model is retrained, it recognizes all the expected cases — Image by Author

Figure 10 shows the final model that I got with the newly collected dataset. As you can see it recognizes the faces of my daughter not only in the age of infant but also more recent times, and it also successfully recognizes her faces wearing masks even when there are other children in the same pictures.

Conclusion

In this post explored how to build a simple but extensible MLOps pipeline with Vertex AI with a practical use case. Vertex AI allows you to prepare your own dataset within a browser. You can play with the codes to interact with Vertex AI APIs with the notebook service. After fixing your code base, you can run the initial pipeline and create the JSON specification file which contains all the details on how to run the pipeline later without the actual code. Finally, you can integrate Cloud Function combined with GCS with the Vertex AI to be run automatically whenever there are any changes in your dataset.

--

--