Distributed Hyperparameter Tuning in Vertex AI Pipeline

A path to enable the distributed hyperparameter tuning in GCP Vertex AI pipeline

Hang Yu
Towards Data Science

--

Photo by Marsha Reid on Unsplash

Introduction

Vertex AI pipelines offer a handy way to implement end-to-end ML workflows from data collection to endpoint monitoring with extremely low effort. For new users, the easiness of development and deployment is largely thanks to the Vertex AI pipeline example offered by GCP.

Despite the comprehensive demonstration of the essential components, the official example also exposes the feasibility for users to customize and enhance the pipeline based on their own needs. Amongst all, one of the most exciting components is the distributed Hyperparameter Tuning (HPT) that is capable of exploring a huge search space to identify the best hyperparameters in a short time. For the time being, GCP recommends leveraging cloudml-hypertune and google_cloud_pipeline_components for this purpose and presents the corresponding tutorials:

GCP HPT task tutorial

HPT pipeline sample

However, the limitation of the tutorials is that the distributed HPT is presented as a standalone HPT task/pipeline and it doesn’t explicitly present the approach to integrate into the existing Vertex AI pipeline shown in the Vertex AI pipeline example, which motivates me to share my successful attempt that bridges the gap. I believe this will benefit many businesses who have built or will build their ML workflows based on Vertex AI pipeline.

The major contribution of this blog is the integration of the distributed HPT into the Vertex AI pipelines. Specifically, it demonstrates how to:

  1. Chain data collection and preprocessing of a Vertex AI pipeline to the distributed HPT. In comparison, the GCP HPT task tutorial and HPT pipeline sample simplify data collection and processing via loading a static tensorflow dataset in the training step.
  2. Optimize HPT results collection to avoid docker arg length limit. In the HPT pipeline sample, the complete HPT results of all trials are encoded as a string that’s passed to a docker task as an input argument for further processing. However, the risk is that the string might violate the length limit of docker input argument in case of a large search space. Thus, a simple solution that combines those two components is explored in this article.
  3. Save the best parameters in firestore. In the HPT pipeline sample, HPT runs the trials, saves the models, and deploys the best one whereas it’s unclear how to visit the best parameters afterwards. This doesn’t suit the scenario that HPT and training is expected to be decoupled. Thus, the firestore option is explored to save the best hyperparameters for the later training jobs.
  4. Chain the distributed HPT to the training component and train the model using the best parameters. Instead of saving the model of every HPT trial as shown in the HPT pipeline sample, an alternative that re-trains and only saves the best model is explored though it’s open to debate if this approach offers a better storage-compute tradeoff depending on the specific scenario.

Integrating distributed HPT into Vertex AI pipeline

Now, let’s go through the major steps mentioned above. It is worth noting that the ML pipeline demonstrated here is largely based on the Vertex AI pipeline example and only the minimum changes are made to enable HPT. For the purpose of demonstration, only two hyperparameters are tuned via grid search as shown below.

SEARCH_SPACE={"num_boost_round": [100, 200],
"min_data_in_leaf": [5, 10]}

The jupyter notebook of this work, which contains the end-to-end process to deploy the distributed HPT, is hosted in the repo below.

1. Chain data preprocessing to HPT

The first challenge I encountered is that the vertex-ai-samples tutorial hard coded the data collection in the HPT container image that is called by the HyperparameterTuningJobRunOp class of google_cloud_pipeline_components.v1.hyperparameter_tuning_job whereas in practice we may want to use the data collection and processing pipeline component in the pipeline.

data, info = tfds.load(name='horses_or_humans', as_supervised=True, with_info=True)

However, currently the HyperparameterTuningJobRunOp doesn’t support input data as an argument, which motivates me to find an alternative way to pass the data source. Fortunately, it turns out that HyperparameterTuningJobRunOp consumes worker_pool_specs that contains the HPT container specification which supports the input args to the HPT container.

worker_pool_specs = [
{
"machine_spec": {
"machine_type": "n1-standard-4",
},
"replica_count": 1,
"container_spec": {"image_uri": hpt_container_image_uri, "args": CMDARGS},
}
]

Intuitively, it implies the feasibility to pass the input data source as a part of the args of the container_spec and it’s validated to be a successful attempt.

The code example of such operation is shown below. Specifically, a new pipeline component called worker_pool_specs is created to receive the input_dataset from the data processing component and generate the worker_pool_specs that’s passed to HyperparameterTuningJobRunOp. In this way, data preprocessing and the core HPT module is associated as depicted in the screenshot below. It’s worth noting training_data_schema, label, and features are also passed as they are required by the training script of Vertex AI pipeline example.

@component
def worker_pool_specs(project_id: str,
data_region: str,
data_pipeline_root: str,
hpt_container_image_uri: str,
custom_job_service_account: str,
input_dataset: Input[Dataset],
input_data_schema: str) -> list:
"""
Pass the preprocessed data uri to hpt as a worker pool argument. The vanilla hpt API
doesn't support 'input data' so it's done this way.

data_preprocess -> dataset.uri -> CMDARGS -> worker_pool_specs -> hpt
"""

display_name = 'hpt-pipeline-template'
fields = [field.split(':')[0] for field in input_data_schema.split(';')]
label = fields[-1]
features = ','.join(fields[0:-1])
CMDARGS = [
"--training_data_uri="+str(input_dataset.uri),
"--training_data_schema="+input_data_schema,
"--label="+label,
"--features="+features
]

# The spec of the worker pools including machine type and Docker image
worker_pool_specs = [
{
"machine_spec": {
"machine_type": "n1-standard-4",
},
"replica_count": 1,
"container_spec": {"image_uri": hpt_container_image_uri, "args": CMDARGS},
}
]

return worker_pool_specs
Chain preprocess to HPT via worker-pool-specs component

2. Optimize HPT results collection

In the original HPT pipeline sample, the output of the HPT module, which is the resource name of the HPT job, is passed to the GetTrialsOp module to retrieve all the hyperparameters and their scores so as to let the GetBestTrialOp module find the best, as shown below.

tuning_op = HyperparameterTuningJobRunOp(
display_name=display_name,
project=project,
location=region,
worker_pool_specs=worker_pool_specs,
study_spec_metrics=study_spec_metrics,
study_spec_parameters=study_spec_parameters,
max_trial_count=max_trial_count,
parallel_trial_count=parallel_trial_count,
base_output_directory=base_output_directory,
)

trials_op = hyperparameter_tuning_job.GetTrialsOp(
gcp_resources=tuning_op.outputs["gcp_resources"]
)

best_trial_op = hyperparameter_tuning_job.GetBestTrialOp(
trials=trials_op.output, study_spec_metrics=study_spec_metrics
)

Currently, GetTrialsOp module encodes the results of all HPT trials into a string as presented below.

Sample output of GetTrialsOp

When the search space is large, one risk observed in practice is that this long string may violate the input argument length of the following docker container that is GetBestTrialOp.

job_spec.worker_pool_specs[0].container_spec.args; Message: Container args should contain less than 100k characters

To avoid this limitation, a hacky but effective approach is attempted though there may be some better options. Basically, the source code of GetTrialsOp (see hyperparameter_tuning_job) is injected into that of GetBestTrialOp so that those two pipeline components merge into one to avoid passing the long string as a docker input.

@component(
packages_to_install=['google-cloud-aiplatform',
'google-cloud-pipeline-components',
'protobuf'], base_image='python:3.7')
def GetBestTrialOp(gcp_resources: str, study_spec_metrics: list) -> str:

from google.cloud import aiplatform
from google_cloud_pipeline_components.proto.gcp_resources_pb2 import GcpResources
from google.protobuf.json_format import Parse
from google.cloud.aiplatform_v1.types import study

api_endpoint_suffix = '-aiplatform.googleapis.com'
gcp_resources_proto = Parse(gcp_resources, GcpResources())
gcp_resources_split = gcp_resources_proto.resources[0].resource_uri.partition(
'projects')
resource_name = gcp_resources_split[1] + gcp_resources_split[2]
prefix_str = gcp_resources_split[0]
prefix_str = prefix_str[:prefix_str.find(api_endpoint_suffix)]
api_endpoint = prefix_str[(prefix_str.rfind('//') + 2):] + api_endpoint_suffix

client_options = {'api_endpoint': api_endpoint}
job_client = aiplatform.gapic.JobServiceClient(client_options=client_options)
response = job_client.get_hyperparameter_tuning_job(name=resource_name)

trials = [study.Trial.to_json(trial) for trial in response.trials]

if len(study_spec_metrics) > 1:
raise RuntimeError('Unable to determine best parameters for multi-objective'
' hyperparameter tuning.')
trials_list = [study.Trial.from_json(trial) for trial in trials]
best_trial = None
goal = study_spec_metrics[0]['goal']
best_fn = None
if goal == study.StudySpec.MetricSpec.GoalType.MAXIMIZE:
best_fn = max
elif goal == study.StudySpec.MetricSpec.GoalType.MINIMIZE:
best_fn = min
best_trial = best_fn(
trials_list, key=lambda trial: trial.final_measurement.metrics[0].value)

return study.Trial.to_json(best_trial)
GetTrialsOp injected into GetBestTrialOp to be one component

3. Save the best parameters in firestore

In the HPT pipeline sample, each HPT trial saves its trained model and the best one gets deployed later on. However, this approach, which couples HPT and model training, exposes some limitations:

  1. The model deployed is trained during one of the HPT trials. However, not every training needs HPT in practice. One such example is the recommender system built using matrix factorization. The model needs to be trained frequently using the latest user-item interactions but HPT is not always needed. Therefore, the option that decouples training and HPT is expected.
  2. Deploying the model of HPT directly could lead to a biased evaluation as HPT is based on the validation data.

To this end, instead of saving the trained models, it is preferred to save the best HPT result to a database like firestore for later use. After storing the best hyperparameters, model training and HPT are decoupled. The best parameters can be re-used to train models until a new round of HPT is needed. Besides, model evaluation can be improved by adding a seperate test set when a model is trained.

The code below demonstrates how the best HPT result is saved to firestore. Specifically, a pipeline component called best_hpt_to_args is defined to consume the best hyperparameters found by the GetBestTrialOp step discussed previously. The storage structure of firestore is to be decided case by case. Here, the timestamp is used to label a HPT pipeline. Lastly, this function returns a string “true”, which is preferred by the pipeline condition, to kick off the conditional model training that’s discussed later on. The validation accuracy is also logged for the sake of observability but this is totally optional.

@component(packages_to_install=['google-cloud-firestore==2.3'])
def best_hpt_to_args(hpt_best: str,
project_id: str,
solution_name: str) -> str:
"""
Write the best hpt params to firestore.
We keep the output to chain this component to the conditional training
"""

import json
from datetime import datetime
from google.cloud import firestore
hpt_best = json.loads(hpt_best.replace("'", '"'))

hpt_best_dict = {}

for i in hpt_best['parameters']:
hpt_best_dict.update({i['parameterId']: i['value']})

for i in hpt_best['finalMeasurement']['metrics']:
hpt_best_dict.update({i['metricId']: i['value']})

db = firestore.Client(project=project_id)
task_flag=datetime.now().strftime('%Y-%m-%d %H:%M:%S')
db.collection(solution_name).document(task_flag).set(hpt_best_dict,merge=True)

return "true"
Firestore example of the saved HPT result
Save best HPT result to firestore

4. Train models using the best hyperparameters

Finally, the HPT is finished. The last improvement I made is appending a conditional training task so that the latest HPT so that the best hyperparameters are utilized to update the model in production immediately. This step is totally optional and subject to the specific use case. It’s worth noting that this condition is receiving hpt_op.output that’s a function that wraps all HPT components from worker_pool_specs to best_hpt_to_args so its output equals that of best_hpt_to_args. Please see the details in the notebook.

with dsl.Condition(
hpt_op.output=="true",
name="train_model"
):

train_task = train_op(
project_id=project_id,
data_region=data_region,
data_pipeline_root=data_pipeline_root,
input_data_schema=training_data_schema,
training_container_image_uri=training_container_image_uri,
train_additional_args=train_additional_args,
serving_container_image_uri=serving_container_image_uri,
custom_job_service_account=custom_job_service_account,
input_dataset=preprocess_task.outputs['output_dataset'],
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
hptune_region=hptune_region,
hp_config_max_trials=hp_config_max_trials,
hp_config_suggestions_per_request=hp_config_suggestions_per_request,
vpc_network=vpc_network)
Conditional training

In the training script (images/training/app.py), a function called get_best_param_values is implemented to collect the latest HPT result by querying firestore. Based on different ways to label the HPT pipelines, there might be different approaches to collect the HPT result of interest. The collected hyperparameters are in the form of a dictionary so they can easily get used to train the model.

def get_best_param_values(project_id, solution_name='hpt-pipeline-template'):
db = firestore.Client(project=project_id)

docs = db.collection(solution_name).list_documents()
doc_latest = max([doc.id for doc in docs])

params_latest = db.collection(solution_name).document(doc_latest).get().to_dict()

logging.info(f'Latest doc id {doc_latest}: {params_latest}')

return params_latest
best_param_values = get_best_param_values(project_id=args.hp_config_gcp_project_id)

Summary

Vertex AI pipelines on GCP provides a great platform to productionize ML solutions with high performance and flexibility. However, the existing tutorials show a limited coverage regarding how distributed HPT can be achieved. To fill up the gap, this article demonstrates the successful attempt to integrate the distributed GCP HPT module into the existing Vertex AI pipeline. Specifically, four limitations that are overlooked by the existing tutorials have been addressed:

  1. Data input. This would allow users to use the data preprocessed on the fly for the purpose of HPT.
  2. HPT results collection. The optimized result collection enables the capacity to explore a large search space.
  3. HPT results storage. Saving HPT results in firestore means training and HPT could be decoupled.
  4. Model training using the best HPT result. Now we can train new models using the saved HPT result.

It’s believed that the improvements discussed above would largely benefit the industrial use cases of Vertex AI pipelines for the scenarios that need to involve a fully automated distributed HPT to optimize the predictive power of the ML solutions running in production. For the detailed end-to-end implementation, please visit the notebook hosted in the repo and feel free to get in touch with me on LinkedIn.

Thank you for reading!

All images unless otherwise noted are by the author.

--

--