How we use Airflow for SQL alerts and Slack notifications

Using Airflow to get notified over Slack when things don’t go as planned while loading data

Juan Gesino
Towards Data Science

--

At Sirena, we use Apache Airflow for running all of our data orchestration. Airflow is an open-source tool that was created by Airbnb to create and schedule data workflows. In essence, Airflow is an orchestrator that runs tasks on given frequencies while also handling backfilling, task dependencies and so much more. In Airflow, workflows are defined as Directed Acyclic Graphs (DAGs) that define the dependencies among different tasks.

At Sirena, we found another use for Airflow besides helping us move data from one place to another. We also use it to trigger alerts on certain data quality measures. The idea is quite simple, we define a SQL query and we define a condition for the result. The query should output a single number, and if that number does not match the condition, then we send a Slack alert. The beauty of it is how simple they are to create. We have parametrised the alerts in a way that allow us to create configuration files in YAML that will get materialised as DAGs in Airflow (more about this on a future post about Airflow factories…). These alerts are especially good for making sure nothing catastrophic has happened to the data pipelines.

In this post, I will guide you through the process of setting up a similar workflow for your Airflow implementation.

Configuration Files

Before we dig into Python code, let’s stop for a second to think about how we want our configuration file to look. Besides all of the “required” fields that are needed to create DAGs in Airflow, such as a name, interval, owner, etc. we are going to need a few other things.

Arguably the most important fields are those that we are going to use to actually define the alert. For that, we are going to need a field to write our SQL query and our condition, such as “greater than 5” or something. We’ll break our condition into two; the criteria (greater than, less than, etc) and our target value.

Besides that, we also need to define how we want to notify, who we want to notify and what the actual message will be. As I mentioned before, in Sirena we use Slack to send notifications, so our notification method is Slack. Our recipients come from a list of Slack channels or users, but we have a dedicated channel for all of the Data Team’s notifications. Lastly, we’ll use the description of the alert to send descriptive messages to let us know what happened.

Notice that we define a notifier. This allows us to configure what service we'll use to send out the notifications to our list of recipients.

We can create one of these configuration files for every alert we want to set up and store them all in /dags/alerts/, our DAG factory will pick up all of these config files from here and create the necessary DAGs. Our DAG factory will pick up all of the configuration files from this directory, that’s why that enabled property comes in handy. We can use it to enable/disable alerts without having to delete the entire file.

Alert Factory

Now we can jump to our alert factory. I suggest you follow my blog for an upcoming post all about DAG factories in Airflow. For now, it’s important to know that a factory is essentially a Python script that will iterate through all the configuration files in /dags/alerts/ and create a DAG for each one of those. The code for it is pretty straightforward, but let's go through it together.

The first thing we’ll want to do is import all of the configuration files from /dags/alets/ and iterate them. For each one of them, we'll want to create a DAG and in order to do this, we implemented a function create_dag_alert that takes the configuration file (as a Python dictionary) and returns an Airflow DAG with one single task that uses the PythonOperator.

Let’s break this function into different parts so we can go over it part by part:

Part 1: Define the DAG

We start by defining some default arguments for our DAG based on the configuration file. You can see that for some of these values we actually use the config file (e.g. owner and schedule_interval) but for some others, we just hard-code the values we want to use (e.g. retries and catchup). This is up to you, we didn't see a lot of value in adding those to the configuration file at the moment, but we might do it at some point. The point is, don't stick to this way of doing it, do whatever works best for you.

Part 2: Task function

This is actually the bulk of the factory. What we want to do here is create a function that will be executed using the PythonOperator. Essentially, this is the function that should determine if the condition is met and if it's not, send out the notification.

We start by getting the query from the configuration file and running it against our data warehouse (in our case Snowflake, but this should work with any data warehouse). We then evaluate the result against the target value configured in the YAML file and compare it using the criteria we specified. After that, we check to see if we need to notify and if we do we call the appropriate function depending on the notifier we selected.

Part 3: Setup the DAG and task

Lastly, we just need to add a task to the DAG to execute the _run_alert function using the PythonOperator.

Going back full circle, notice how we are returning the DAG (dag) we created so we can use it and add it to the global scope.

Conclusion

Now, there’s a lot to be said about this implementation. I think the most important point about it is the DAG factory approach to it and how simple it is to write new alerts. But more about that in a future post. About the alerts themselves, it’s good to bear in mind that this is not going to replace your data quality tool like great_expectations, or even dbt tests. Furthermore, in the same way, that great_expectations and dbt tests can live together in harmony, I see a place for these SQL alerts to be combined with these other tools. The way I think about it is that you can use these SQL alerts for data loading, dbt tests for testing out your transformations and great_expectations for your final products to make sure your data stakeholders are using reliable data.

Looking back at the implementation itself, as I was writing this post I noticed that this code is in need of a refactor, and there are some things that could be implemented in a cleaner way. But hey, I leave it as an exercise to the reader to find better ways to improve this script and maybe even extend it to more and better use-cases. In any case, hope this is useful as a base and sparks your creativity to improve your notifications regarding data loading into your data warehouse.

--

--

Economist • Insights Manager @ Belvo • Ex-Data Analytics Manager @ Zenvia • Ex-Head of Data @ Sirena