The world’s leading publication for data science, AI, and ML professionals.

Rapid transformer inference at scale in the Google Cloud Platform

Parallel inference on millions of strings using transformers with minimal infrastructure setup

Photo by Guillaume Jaillet on Unsplash
Photo by Guillaume Jaillet on Unsplash

Transformers are taking the NLP word by storm. Every text-related task has benefitted from transformers. The best library to work with transformers is the HuggingFace one, and the easiest way to get started with HuggingFace is through Simple Transformers. There are countless tutorials online that will teach you about how to fine-tune transformer models for classification, translation, named entity recognition, etc.

Recently, at an internal hackathon hosted by my employer, Truebill, I had to rapidly run Inference on millions of strings using Simple Transformers. Turns out that the process is not completely intuitive, so this post describes how to quickly set up inference at scale using Simple Transformers (it will work with just Hugging Face with minimal adjustments) using the Google Cloud Platform. It assumes that you already have a model, and are now looking for a way to rapidly use it at scale. It also assumes that you have a GCP account and some basic familiarity with the platform. It likely isn’t the cheapest or the most efficient way to run things in production, but if you need to process a batch of millions of strings in a pinch – this guide will help you do that with minimal setup time.

Resource Provisioning

First, you will need to provision some resources: a Notebook instance to control the Pipeline cluster, and a cluster to do the heavy lifting. Both live in the AI Platforms section of the GCP.

Start by creating a Notebook instance. If you’re not using it to train the model, the type doesn’t matter much, but if you are going to train the model in the Notebook instance, you will want the PyTorch one with a GPU. Make sure to check the Install NVIDIA GPU driver box if you pick the GPU option.

Then, provision the cluster. Go to AI Platform->Pipelines and click New Instance. Then click Configure, and create a new cluster for the pipeline.

Check the Allow access to cloud APIs checkbox!

Image by author
Image by author

Click the create cluster button, finish the pipeline creation process, and then go to Kubernetes Engine->Clusters. Pick your newly created cluster and click the Add Node Pool button at the top. Check the Enable autoscaling button, so that machines are shut down when not in use. Set the minimum node number to 0, and the maximum to however many machines you will need to run at once. It really helps if you have a benchmark of how long your model inference takes – will help you allocate the right number of nodes to finish the job in the time you have.

Note that the GCP has a default limit of allowing 8 GPUs of a single type in a single region, so if you need to use more than 8, you’ll have to either pick multiple zones or request that limit to be raised for your project. Note that not all regions have all types of GPUs, so see which zone has the type you need here.

Here's which GPUs US Central zones have available as of January 2021. Image by author
Here’s which GPUs US Central zones have available as of January 2021. Image by author

Open the Nodes tab in the menu on the left. You will want N1 nodes (they allow GPUs). Pick the CPU/Memory that your project requires, then click on the "CPU platform and GPU" button and pick the GPU that you need. Check the "I understand the limitations" checkbox.

You will most likely want to also check your Enable preemptible nodes checkbox. It drastically lowers the compute cost, but doesn’t guarantee uptime and sets a 24-hour-limit on each node. The node settings will look something like this:

Image by author
Image by author

After that, open the Security tab on the left and pick Allow full access to all Cloud APIs (unless you want to manage access manually, which is always painful). Create the node pool.

Go back to your cluster. If it has an Upgrade available option as pictured below – click it, and update to the latest option. After the cluster is done upgrading, go to the Nodes tab, and upgrade the nodes. If the version of your GPU nodes is below 1.18, Transformers won’t work on them because of driver incompatibility.

After you’ve updated the nodes, click the connect button at the top of the Cluster page. Click Run in Cloud Shell, press enter to run the connection command after the shell loads. Authorize as needed. Run the following command to install NVIDIA drivers on your nodes:

kubectl apply -f https://raw.githubusercontent.com/GoogleCloudPlatform/container-engine-accelerators/master/nvidia-driver-installer/cos/daemonset-preloaded.yaml

Voila! You’re done with resource provisioning. Now to the fun part.

Setting up Docker

Go to AI Platform -> Pipelines in GCP. Open the Pipelines Dashboard for your pipeline. Copy the beginning of the URL that it takes you to, ending with *googleusercontent.com/

Now go to the Notebook instance that you created at the very beginning and open JupyterLab. Paste the URL that you just copied and assign it to a variable. Then assign the ID of your GCP project to another variable and import some pipeline-related stuff. Your first cell will look like this:

PROJECT_ID='YOUR-GCP-PROJECT-ID'
HOST = 'https://YOUR-PIPELINES-URL-LOOKS-LIKE.pipelines.googleusercontent.com/'
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
import kfp.components as comp
import kubernetes as k8s
import kfp
client = kfp.Client(host=HOST)
client.list_pipelines()

The last two lines aren’t necessary, but running them checks if your notebook is able to connect to your cluster. If it works, you’ll get a bunch of info printed about the cluster. If it doesn’t, there’s likely a lack of permissions somewhere.

Run the cell.

Now you need to either train your model in the notebook or upload your existing model to the notebook machine. Place it in a folder inside the pipeline folder.

Remember the name of your model folder. Image by author
Remember the name of your model folder. Image by author

Use this moment to check your model_args.json (right-click, open with editor). Make sure that _usemultiprocessing and _use_multiprocesseddecoding are both set to false. PyTorch uses a lot of shared memory, which doesn’t play well with Kubeflow pipelines. It is possible that there is a way to use multiprocessing within the cluster, but I haven’t found it yet (if you do – post it in the comments!) Make sure that _ngpu matches whatever you picked for the nodes you created.

The next cell creates a Docker file for your nodes. It will look like this (the example below has imports to work with data hosted in BQ and GCS buckets):

%%bash
mkdir -p pipeline
cat > ./pipeline/Dockerfile <<EOF
FROM nvidia/cuda:10.2-runtime-ubuntu18.04
RUN apt-get update &amp;&amp; apt-get install -y 
python3 
python3-pip
RUN pip3 install --upgrade pip
RUN pip3 install numpy
RUN pip3 install pandas
RUN pip3 install torch==1.6.0 torchvision==0.7.0
RUN pip3 install simpletransformers
RUN pip3 install pandas-gbq
RUN pip3 install google
RUN pip3 install google-cloud
RUN pip3 install google-cloud-bigquery
RUN pip3 install google-cloud-storage
RUN pip3 install gcsfs
WORKDIR /app
COPY . /app
EOF

Feel free to change some things around (specifically – make sure you have all the python libraries your model needs). However, you might want to keep the same base image. The NVIDIA ones are set up to work with Kubeflow Pipelines, and if you pick a different image, your setup might get more complicated. Run this cell.

Your next cell uses Google Cloud Build to build your Docker Image. The only things you might want to change here are the image name (it doesn’t have to match anything), and the numerical value after ‘ – timeout’. It sets the limit for how long Cloud Build has to build your image; if your image is hideously complex, it might take longer than the 20 minutes assigned here.

IMAGE_NAME="YOUR_IMAGE_NAME"
TAG="latest"
GCR_IMAGE="gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}".format(
 PROJECT_ID=PROJECT_ID,
 IMAGE_NAME=IMAGE_NAME,
 TAG=TAG
)
APP_FOLDER='./pipeline/'
import subprocess
# ! gcloud builds submit - tag ${IMAGE_NAME} ${APP_FOLDER}
cmd = ['gcloud', 'builds', 'submit', ' - tag', GCR_IMAGE, APP_FOLDER, ' - timeout', '1200']
build_log = (subprocess.run(cmd, stdout=subprocess.PIPE).stdout[:-1].decode('utf-8'))
print(build_log)

Run the cell. It might take a while, and the process should result in a long output ending with SUCCESS.

Creating and running the pipeline

We’re now at the fun part. The next cell will have the function that each of your nodes will actually run. The function will have 4 parts:

  1. Imports
  2. Reading data
  3. Running inference
  4. Storing data

The exact code will depend on where your data is stored, what your model is, whether you need to do any pre-processing, etc. Here is what mine ended up looking like:

def predict(file_number,model_folder='YOUR_MODEL_FOLDER'):
  #Imports
  import pandas as pd
  import re
  from simpletransformers.t5 import T5Model
  #Read Data
  GCS_BUCKET='gs://MY-BUCKET'
  file='{}/input/batch_{}.csv'.format(GCS_BUCKET,file_number)
  df=pd.read_csv(file)
  #Run Inference
  model = T5Model("t5",model_folder)
  predictions = model.predict(df["text"].tolist())
  df['output']=pd.Series(predictions)
  #Filter out special characters (application logic)
  df['output'] = df['output'].apply(lambda x:re.sub("[^A-Za-z0–9 '-]+",'', str(x)))
  #Store the results
  df.to_csv('{}/output/batch_{}.csv'.format(GCS_BUCKET,file_number))
#This line is not a part of the function and is needed for the pipeline.
predict_container=comp.func_to_container_op(func=predict, base_image=GCR_IMAGE)

Yours will be different, so keep a few things in mind. Firstly, the _modelfolder should match whatever is the name of the folder where your model lives in the notebook instance (just the model folder name, without "pipeline/"). Secondly, there should be some parameter, which defines which chunk of the data to process. In my case, it is _filenumber, as the input data is split into many files. Yours could be a parameter in a BQ query, or something else.

As you can see above, Google Cloud Storage is well-supported – you don’t have to do any authentication. So is Big Query: _pd.readgbq will work out of the box (if you use it, I highly recommend setting _use_bqstorageapi to True, works much faster that way).

Your model code will end up being different, depending on which transformer you are using, you might have your own custom application logic as well. Don’t forget to store the results! The last line of the cell adds your function as a step in the pipeline. When you’re done implementing your logic, run the cell.

The next cell defines the pipeline. Mine looks like this:

@dsl.pipeline(
    name='My Pipeline',
    description='What it does'
)
def my_pipeline():  
  predict_memory = '12G'
  for i in range(0,10):

    predict_task1 = predict_container(file_number="00"+str(i))
    .set_memory_request(predict_memory).set_gpu_limit(1).apply(gcp.use_preemptible_nodepool())

    predict_task2 = predict_container(file_number="01"+str(i))
    .set_memory_request(predict_memory).set_gpu_limit(1).apply(gcp.use_preemptible_nodepool())
    predict_task2.after(predict_task1)

    predict_task3 = predict_container(file_number="02"+str(i))
    .set_memory_request(predict_memory).set_gpu_limit(1).apply(gcp.use_preemptible_nodepool())
    predict_task3.after(predict_task2)
my_pipeline_name = my_pipeline.__name__ + '.zip'
compiler.Compiler().compile(my_pipeline, my_pipeline_name)

Note a couple of things. There is a memory request – it should be slightly lower than the node memory you chose earlier. There’s a GPU limit, which should be equal to the number of GPUs you use in a node. The _.apply(gcp.use_preemptiblenodepool()) bit should be there if you picked preemptible nodes in the cluster setup.

Also note that there is a loop, and there are few tasks within that loop. Every iteration of the loop is a request to start a node (it’s fine to have more iterations in the loop than you have nodes provisioned, the cluster will just put nodes to work as they finish earlier tasks). In my case, I have allocated 0–10 nodes, so when I start this pipeline, the cluster will spin up 10 nodes at once. Then each node will do the tasks within the loop iteration in order. So, in my case, each of the 10 nodes will do 3 tasks one after another and then shut down. There is no need to have multiple tasks, you can have just one. I mostly like to have multiple tasks per node, so I can visually monitor the progress of the job at a glance.

The next cell is the last one, it creates and starts a job in your pipeline! It looks like this:

client = kfp.Client(host=HOST)
arguments = {}
experiment = client.create_experiment("experiment name")
run_name = "test run"
run_result = client.run_pipeline(experiment.id, run_name, my_pipeline_name, arguments)

You don’t need to change anything here unless you want to name your experiment and run. That’s it! When you run the cell, the Notebook will run your job in the cluster and give you links to monitor the job.

Success! Image by author
Success! Image by author

One thing to keep in mind when monitoring the job: as the cluster spins up nodes, it will display things to the effect of "resources unavailable" as the status of your pipeline tasks. It’s OK, and the status should change in a few minutes as the nodes spin up.

As always, don’t forget to de-provision your cloud resources when you’re done with them! You can do it in the Kubernetes Engine->Clusters and AI Platform-> Notebooks parts of the GCP.


Related Articles