Building an Open Source ML Pipeline: Part 2

Event-Driven Data Processing with Argo Events and Argo Workflows.

Bennett Lambert
Towards Data Science

--

1. Setting up Argo Events

In order to set up event-driven workflows we need to add another tool to our toolkit, namely Argo Events. Setting up Argo Events can be a bit tricky, but I provide the necessary YAML files to do so on Github here. We will start off with an example from their examples just to make sure that everything is installed correctly. So once you have cloned the repository feel free to take a look at the contents of these manifests. As far as modifications go, you need to replace the base64 encoded credentials in ‘minio-secret.yaml’ with your own in order for the event source to work.

kubectl apply -f argo-events.yaml -n argokubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-events/stable/examples/eventbus/native.yamlkubectl apply -f minio-secret.yaml -n argokubectl apply -f minio-event-source-example.yaml -n argokubectl apply -f minio-sensor-example.yaml -n argo

You may have noticed that in Argo Workflows there is a tab in the sidebar for events-related things. If you navigate there now you should get an error about the service account not being able to list resources in this category. This is because the default Argo Workflows role does not consider Argo Events. To fix this run:

kubectl apply -f argo-role.yaml -n argo

Now when you navigate to the events tab you should see something similar to this. This is our example eventsource -> sensor -> trigger set up. When we drop a file in our ‘openaq’ bucket it triggers a whalesay workflow that prints the filename of the uploaded file.

Our example event source outlined in the Argo UI. Image by Author.

That was a lot of YAML, so let’s take a step back and talk about how Argo Events works and what each file is doing. Argo events works by connecting ‘Event Sources’ with ‘Sensors’ that contain ‘Triggers’. Event sources consume events from external sources. In our case our event source is consuming notifications generated by Minio when a file is put into the ‘openaq’ bucket. Sensors listen to specific event sources and define what should happen when an event occurs (this is the Trigger and in our case we deploy an Argo Workflow). Argo Events is really a cool framework here we are using Minio as an event source, but it could just as easily be a Kafka topic or a Redis pubsub event.

Image from https://argoproj.github.io/argo-events/.

We need to define our event source and sensor with a trigger template. To test I used the example event source and sensor files from the Argo Events Github page. At first there were some problems, but I opened a Github issue and it was resolved within a couple of days. Really impressive response time from the team! Now if you port forward and access the Minio bucket, you can upload a file and you should see the example event in action.

We will come back to Argo Events and defining our preprocessing templates, but first we need to generate an expectation suite for data validation. For this we will use Great Expectations. I’ve written an article previously about using Great Expectations with Argo Workflows and a lot of this will be lifted from there.

2. Generating an Expectation Suite in Argo Workflows

In order to generate our expectation suite we need to pull representative data from the OpenAQ API. In order to do this we will grab data as far back as the API allows and then generate our expectation suite. This data should in principle be the same data we use to train our models, so I will save the data into our Minio ‘data lake’ for future use. We can make use of Hera to generate a workflow for this, which makes life a lot nicer. Also we can make use of our extract and transform functions from part 1 with a little bit of modification. A future to-do here is to parametrize the extract function, so that we don’t need to hardcode in start and end dates for the API call. We need to create two more Minio buckets for our data validation efforts, because we don’t want to trigger our Argo Events pipeline when we upload Great Expectations files. I created a bucket called ‘validation’ and one called ‘train’. We will store our Great Expectations suite in ‘validation’ and the raw data in ‘train’. When all is said and done the code to generate expectations from OpenAQ data looks as follows:

Here we are making use of Argo Workflow’s ability to run parallel tasks by defining the order of our tasks in the lines:

extract_task >> ge_task
extract_task >> store_task

Before running the code, make sure you are port-forwarding the Argo server on localhost:2746 and Minio at localhost:9000. After submitting you should be able to see a successful pipeline run in the Argo UI:

Image by Author.

3. Setting up our Event-driven pipeline.

In the final iteration of this pipeline, we would like to be able to trigger model re-training based on model metrics. For this I think the most flexible option is creating a webhook event source. This means that we can trigger an argo workflow using an http request. Briefly, our workflow will look like this:

  • Upon an http trigger, retrieve historical OpenAQ data.
  • Transform data into tabular format.
  • Generate an expectation suite for the retrieved data.
  • Store raw data in our ‘train’ Minio bucket.

From there we can set up a Minio event source that will trigger a model training pipeline. This we will cover in the next article!

To avoid having to write our pipeline in YAML, we will make use of an Argo Workflows CRD, the Workflow Template. We can define workflow templates using Hera in Python and then reference the workflow template in our Argo Events definition.

Switching from a Workflow to a Workflow Template in Hera is very easy. Below is the entire code for completeness, but I will highlight the specific changes.

The changes occur in the imports and in lines 180–181.

ws = WorkflowTemplateService(host="https://localhost:2746", verify_ssl=False, token=TOKEN)w = WorkflowTemplate("generate-expectations", ws, namespace="argo")

Now all that is left is to define our webhook to submit this workflow template whenever it receives a post request.

First let’s define our webhook event source.

Here we create a webhook called retrain that is exposed at the endpoint /retrain and on port 12000. Next let’s create our sensor:

You can see here that we make use of the workflow template we created earlier through the use of the “templateRef” field. If we kubectl apply these two files we then should have a ready to go eventsource + sensor pair.

Next to test the set up port-forward the eventsource pod.

kubectl -n argo port-forward webhook-eventsource-{randomstring} 12000:12000

We can then simply test it by sending a post request to our pod at the /retrain endpoint. One interesting thing that I learned is that I had to include dummy data in order for the workflow to trigger correctly, even though it has no parameters.

curl -d '{"dummy":"dummy"}' -H "Content-Type: application/json" -X POST http://localhost:12000/retrain

Now we have a set up where we can set off our pipeline in an event-driven manner. We will continue to build up our pipeline in the same way. In the next article we will focus on the model training process and how we can couple Argo Workflows with MLflow for model training, experiment tracking, and model storage.

--

--