Build your Data Pipeline on Kubernetes using Kubeflow Pipelines SDK and Argo Workflows

Running K8-native data pipelines

Lior Shkiller
Towards Data Science

--

The importance of a good data pipeline

How much effort is spent building the elements of your Machine Learning pipeline? Reference “Hidden Technical Debt in Machine Learning Systems” — Sculley et al.

For those of you who haven’t seen the diagram above, I highly recommend reading the paper “Hidden Technical Debt in Machine Learning Systems”. It covers best practices for building machine learning systems. One of the sections in the paper is about ML-system anti-patterns and pipeline jungles:

Pipeline Jungles. As a special case of glue code, pipeline jungles often appear in data preparation. These can evolve organically, as new signals are identified and new information sources added incrementally. Without care, the resulting system for preparing data in an ML-friendly format may become a jungle of scrapes, joins, and sampling steps, often with intermediate files output. Managing these pipelines, detecting errors and recovering from failures are all difficult and costly.

How to avoid pipeline jungles

Use a workflow engine.

There are a lot of workflow engines that help with pipeline orchestrations and building ETLs. I won’t get into the pros and cons of each framework, because that would take a whole different blog post and actually, I don’t think there is a clear winner.

The purpose of this blog post is to show you how to use Kubeflow Pipelines SDK to run Argo Workflows.

Why Argo Workflows?

The simple answer is that it’s cloud-native, which means that if you already have a Kubernetes cluster running, Argo is implemented as a Kubernetes CRD and allows you to run pipelines natively on your cluster. With Argo, each task executes in a pod and you can easily execute multiple tasks as a DAG. It contains many important features such as passing artifacts between tasks, parameterization, scheduling and more.

How do I run Argo Workflows?

The way you run Argo Workflows is by using a YAML configuration file.
Here is an example of a running a simple “hello world” task that runs a python docker image and prints “hello world”.

apiVersion: argoproj.io/v1alpha1
kind: Workflow # new type of k8s spec
metadata:
generateName: hello-world- # name of the workflow spec
namespace: default
spec:
entrypoint: hello-world-template # invoke the template
templates:
- name: hello-world-template # name of the template
container:
image: python:latest
command: ["python","-c"]
args: ["print('hello world')"]

Argo Workflow engine has a user interface with the following features:

  • Monitor and run Argo Workflows
  • View container logs, environment variables, task parameters and outputs
  • View and run cron workflows
Argo Workflows UI

So why do we need Kubeflow Pipelines SDK?

YAML has its limitations, especially when you want to run pipelines with many tasks and do fast iterations. For this reason, various Argo SDKs are currently being built that will grant you the ability to programmatically define Argo Workflows in Python and translate your code to the Argo YAML specification.

One of the most mature SDKs was built under the Kubeflow project. Kubeflow is an open, community-driven project to make it easy to deploy and manage an ML stack on Kubernetes. Companies including Google, Cisco, IBM, Microsoft, Red Hat, Amazon Web Services and Alibaba are among those using it in production. It has a loosely coupled, microservice architecture.

One of those services is Kubeflow Pipelines (KFP), which is a platform for building and deploying portable, scalable machine learning (ML) workflows based on Docker containers. It has a user interface for managing and tracking experiments, jobs, and runs.

There is a subtle distinction between Argo Workflows and KFP. Argo is the workflow engine behind KFP and KFP is meant mainly for ML- related usages.

Unlike Argo, ML-related usages have been the only focus for Kubeflow Pipelines; it’s not targeted for other data-processing tasks.

Where do ML-related usages begin and end?
I found Argo to be more natural for tasks like data-ingestion and general data-processing pipelines that are not meant to end with a running ML experiment.

Since Argo is the workflow engine behind KFP, we can use the KFP python SDK to define Argo Workflows in Python. The KFP SDK provides a set of Python packages that you can use to specify and run your workflows. Those pipelines will be compiled to the Argo YAML specification. You can use it by simply installing the package withpip install kfp.

An Example Workflow

In the following example, I would like to show you how to write a simple pipeline with KFP python SDK. The pipeline will receive a parameter, run a for-each loop and transfer data between tasks (The general building blocks of most data-processing pipelines). It’s written using KFP python SDK and will be compiled to an Argo YAML configuration.

import kfp@kfp.components.func_to_container_op
def print_func(param: int):
print(str(param))
@kfp.components.func_to_container_op
def list_func(param: int) -> list:
return list(range(param))
@kfp.dsl.pipeline(name='pipeline')
def pipeline(param: int):
list_func_op = list_func(param)
with kfp.dsl.ParallelFor(list_func_op.output) as param:
print_func(param)
if __name__ == '__main__':
workflow_dict = kfp.compiler.Compiler()._create_workflow(pipeline)
workflow_dict['metadata']['namespace'] = "default"
del workflow_dict['spec']['serviceAccountName']
kfp.compiler.Compiler._write_workflow(workflow_dict, 'pipe.yaml')

Let’s explain the different parts of the script

@kfp.components.func_to_container_op
def print_func(param: int):
print(str(param))
@kfp.components.func_to_container_op
def list_func(param: int) -> list:
return list(range(param))

Wrapping your python functions (tasks) with @func_to_container_op decorator will convert the function to a task component and return a task (ContainerOp) factory. The task will run inside a Docker container (the default image is tensorflow/tensorflow:1.13.2-py3). It’s also possible to change the base_image.

@kfp.dsl.pipeline(name='pipeline')
def pipeline(param: int):
list_func_op = list_func(param)
with kfp.dsl.ParallelFor(list_func_op.output) as param:
print_func(param)

Wrapping your function with a @dsl.pipeline decorator will convert the function to a pipeline component that describes how the task components interact with each other. There are many different ways tasks can interact with each other (dags, loops, conditions, etc).

In the above example, the pipeline receives a parameter that will specify the number of sub-tasks to run. list_func_op is a container component that runs list_func, for each item in the list that list_func returns, KFP will launch another container that will run print_func with the relevant list item as a parameter. Each task will run in parallel on Kubernetes pods.

A sharp-eyed reader might ask: “How do different tasks transfer data between each other?”. Well, to do that, you will need to configure an Artifact Repository for Argo (for example S3 or GCS). You can also configure a different artifact repository for each pipeline using kfp.dsl.ArtifactLocation. In our case, KFP takes care of saving and loading the data by wrapping our functions with JSON serializers which save the data to the artifact store.

The final lines will compile the pipeline and output the Argo YAML.

workflow_dict = kfp.compiler.Compiler()._create_workflow(pipeline)# The following lines are needed to adapt kfp to argo yaml
workflow_dict['metadata']['namespace'] = "default"
del workflow_dict['spec']['serviceAccountName']
# Save yaml output
kfp.compiler.Compiler._write_workflow(workflow_dict, 'pipe.yaml')

Once you get the YAML, you can run it using Argo CLI:

argo submit --watch pipe.yaml -p param=5

Finally, this is how it looks like in Argo UI:

You can view the full pipeline script, including artifact configuration, in the following gist:

If you enjoyed this post, feel free to follow me on twitter

--

--

Founder of http://deep-solutions.net Machine Learning Practitioner, passionate about data science and ML engineering