The world’s leading publication for data science, AI, and ML professionals.

Apache Airflow for containerized data-pipelines

Are you having problems running tasks with a different version of Python on Airflow? In this article, I explain how to solve this issue.

Photo by frank mckenna on Unsplash
Photo by frank mckenna on Unsplash

Introduction

You have probably heard about Apache Airflow before, or you’re using it to schedule your data pipelines right now. And, your approach depending on what you’re going to run is to use an operator for it, you schedule Spark jobs with the SparkSubmitOperator and use the BashOperator to run something on the machine where Airflow is running. Then things start to get interesting when you need to run Python code.

Again you go to the documentation and find the famous and useful PythonOperator. It’s straightforward. You just put the Python code that you want to run in there, and it runs depending on the configuration of your DAG. You just need to make sure that your code runs with the Python version of Airflow and your libraries are installed on the system.

The more your workflow uses Python, the more you use the PythonOperator. You’re happy because you’re able to use your Python knowledge for all your Data Science tasks. Airflow allows you to install Python modules in there, so you can have all your preferred libraries on hand to do remarkable stuff.

All of this works well until you or someone on your team wants to use a new version of Python or a new library, at that point you already have many tasks running with an older and incompatible version of the library and from there you have two options:

  • Update the library and update all the tasks with issues.
  • Use an outdated version of the library in accordance of all the existing tasks.

Let’s go deeper with those options, and analyze the first one. You have already 30 tasks using the old version of the library, so you and your team take the time aside and update the library and all those jobs with the changes. It’s not an easy task, but you all do it. At the end it’s rewarding because you have the chance to use all the new features of the library and your code base is up to date with the latest trends. But this doesn’t scale, what happens when you have 100 tasks running, and you need to do the same? Or simply your old tasks for whatever reason need the old version of the library?

Now let’s analyze the second option, which seems to be less painful. You develop your tasks with an outdated version of the library and call it the day. It’s not ideal, but probably you figure out how to get the job done with the outdated version. But again, this doesn’t scale, after a year or two, you continue developing with an outdated version you’re going to face various problems, one of them is that you’re going to need some new thing that at this point it’s too hard to accomplish with the outdated version of the library, and you cannot do it. This is really demotivating and is going to affect you and your team. At the end, your code base is not going to be a happy place where you can innovate and try new things.

Now let’s imagine an even worst scenario when we need to update the Python version of the system or one of our libraries is incompatible with Airflow itself. This is a major problem, and it’s so common, especially when the team is growing.

So, what can we do to solve this?


Use Containers

The solution to our previous problems is to use Docker Containers.

Airflow scheduling containerized tasks - Image by Author
Airflow scheduling containerized tasks – Image by Author

Imagine that you can create a DAG in which each task is running in a Docker container, and because of the principle of containers you have the isolation that you need to install whatever version of the library or the language that you want to use for it.

This will give you the autonomy to create the tasks with new technologies, without the burden of updating the tasks that are already running.

But how can I accomplish this?

Let’s explore a few options.

Scheduling jobs on Kubernetes

Airflow scheduling tasks with KubernetesPodOperator - Image by Author
Airflow scheduling tasks with KubernetesPodOperator – Image by Author

If you don’t know what Kubernetes is, you can go here.

With this approach, we launch Docker containers in the form of Pods in Kubernetes. Those Pods are ephemeral, this means that once the task is finished, the Pod is destroyed, and it doesn’t use capacity of the system.

So, each task is a Docker container and each container is launched on Kubernetes as a Pod. Airflow pulls the logs from Kubernetes into the UI of the task, so it’s transparent to you where the task is running at the end you continue the normal use of Airflow.

To achieve this, we use the KubernetesPodOperator.

The [KubernetesPodOperator](https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/_api/airflow/providers/cncf/kubernetes/operators/kubernetes_pod/index.html#airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator) uses the Kubernetes API to launch a pod in a Kubernetes cluster. By supplying an image URL and a command with optional arguments, the operator uses the Kube Python Client to generate a Kubernetes API request that dynamically launches those individual pods. Users can specify a kubeconfig file using the config_file parameter, otherwise the operator will default to ~/.kube/config.

This is the more general solution that I found to schedule containerized tasks on major cloud providers or even on premise. Kubernetes is offered as a service on Azure, AWS, Google Cloud, Linode, you name it. Probably it’s a Kubernetes solution in there.

Scheduling jobs on GKE (Google Kubernetes Engine)

Airflow scheduling tasks on GKE - Image by Author
Airflow scheduling tasks on GKE – Image by Author

If you’re on Google Cloud, and the Kubernetes cluster that you’re planning to use it’s the managed version (GKE). To make things easier, the appropriate operator is GKEStartPodOperator. You could use the KubernetesPodOperator, but at the end it’s more work to set up all the environment, if you’re on Google Cloud I think this is the recommended way to do it.

Scheduling jobs on ECS (Elastic Container Service)

Airflow scheduling tasks on ECS - Image by Author
Airflow scheduling tasks on ECS – Image by Author

Now let’s consider that you’re on AWS, they offer a solution called Elastic Kubernetes Service or EKS. If you’re going to launch tasks in there, you could use the KubernetesPodOperator.

But AWS offers another container orchestration service called Elastic Container Service or ECS, and as you may have guessed, there is an operator to launch containerized tasks in there, and it’s called ECSOperator.

On ECS, there are no pods, the concept it’s a bit different, in there they are called tasks. You are going to schedule airflow containerized tasks on ECS tasks. I know, it sounds confusing, but the principle is the same as pods, they are ephemeral. So, once the airflow task is finished, it’s not going to consume resources.


Considerations

  • Make sure that your cluster has all the permissions and credentials available when you are running your tasks there. If the cluster is not the same where Airflow is running, and you’re migrating tasks this is something that you need to consider.
  • The costs of an extra Kubernetes or ECS cluster for running tasks it’s not something that you take lightly. Make sure that you consider that.
  • One thing is the KubernetesPodOperator that we used earlier to run tasks on Kubernetes and another thing is the Kubernetes Executor, the last one is to use Kubernetes to run Airflow itself. Please make sure that you don’t confuse those, sometimes there is a lot of information online that could make that happen.
  • You don’t necessarily need to run Airflow itself on Kubernetes or ECS, you can schedule tasks in external clusters.

Conclusion

This is not a silver bullet, as always in all technical decisions, there is a trade-off. Yes, containers allow you to have better isolation but also gives you more operational complexity. If you want this approach to work for you, you need to have a CI/CD pipeline in place, so it’s not a burden to deploy a new version of a task.

Although it’s pretty cool to not be limited with the libraries or languages that we can use for our tasks. This is something that usually people overlooked but to be able to experiment and try new things is a major advantage for the team’s morale.


Resources


Related Articles