Creating an environment with Airflow and DBT on AWS (part 3)

Using DBT Cloud and integrating Airflow with DBT

Arthur Chaves
Towards Data Science

--

Photo by tian kuan on Unsplash

In part1 and part2, we created and configured our EC2 instance, with DBT and Airflow, and created an initial project for both, to test them. Now, we will finally use Airflow and DBT together, first on our instance, and then switching DBT to the Cloud version, creating a project there too.

1. Running the dbt command with Airflow

As we have seen, Airflow schedule and orchestrate, basically, any kind of tasks that we can run with Python. We have also seen how to run DBT with the command dbt run. So, one way we can integrate them is simply by creating a DAG that run this command on our OS.

Assuming you are connected to the EC2 instance and using the airflow user, create a DAG file:

$ vi ~/airflow/dags/load_users.py

Now, add this code:

from airflow import DAG
from airflow.operators.bash import BashOperator
import datetimedefault_args = {
'start_date': datetime.datetime(2021,1,1)
}
def run_dbt(task_id, command):
return BashOperator(
task_id=task_id,
bash_command='cd /home/airflow/dbt; ' + command,
do_xcom_push=False
)
with DAG('load_user',
schedule_interval="@daily",
default_args=default_args, catchup=False) as dag:
load_user = run_dbt('load_user', 'dbt run')

The task within the DAG run the bash command dbt run, using the BashOperator of Airflow. It’s just like we have executed the command on our terminal. Here, we have to run this command from the DBT folder (the one with the dbt_project.yml and other directories), so we create a function (run_dbt) to return an operator adding the command necessary to move to the directory (‘cd /home/airflow/dbt’). We can instead specify the correct path using ‘ --project-dir’ when running dbt run, but I think this way is simpler if you have other DBT commands to run with your tasks.

After saving the file, all you have to do is wait until the new DAG is loaded in the Webserver (the Airflow running in your web browser). When it is, turn on the toggle and the DAG will be executed.

Our DAG running DBT (only one DAG cause I filtered by the name, ok?)

Click on the DAG name and go to the Graph View. Your task will be probably executing (marked on light green). After a few seconds, the task will be marked on dark green, showing that is successful.

Our dbt task executed by Airflow

If you click on the task and open the log, you will see the same output as you would have by running the DBT from your terminal:

The output showing the execution of each model we have (each table/view created by DBT)

Since DBT is installed on the same machine of Airflow, we can just run the dbt command calling the OS in this instance. If you have DBT installed on another machine, you can also use different methods to run the command remotely, for example, using Airflow SSH Operator.

2. Creating the project on DBT Cloud

Now, we also have the option of not having DBT installed at all. Using DBT Cloud, we can create our project there and run our jobs using their infrastructure. To do so, you will have to create an account on DBT page (https://cloud.getdbt.com/login).

Before log in, we need to do two things.

First, go to the rules of the security group used by your RDS database. Edit the inbound rules and add three more, all with the type PostgreSQL, and using the IPs 52.45.144.63, 54.81.134.249 and 52.22.161.231. These IPs are the ones used by DBT Cloud and DBT needs to be allowed to connect to our database:

Adding the rules so DBT can run jobs in our database

The other thing we must do is to create a schema and an user to develop our project with DBT Cloud. This user will be used only for development. It has to have access to the source tables, in schema data_lake. Also, it will create tables and views on the new schema (this schema is specifically for this user), so it needs to have this permission too. So, run the SQL code below, using the admin user:

create user dev_user with password '1234';
create schema dbt_dev;
grant all privileges on schema dbt_dev to dev_user;
grant usage on schema data_lake to dev_user;
grant select on all tables in schema data_lake to dev_user;

Ok, now go back to DBT page. After you create your account, when you log in for the first time, you will see something like this:

Let’s use this to configure our project. Click Continue. In the next page you will choose the database type. In our case, PostgreSQL, so select it. In the next page, you will have to set up the connection with our database. Choose any name you want. In the PostgreSQL settings, you will use the information to connect to your RDS database (the same values you used to connect with DBeaver or your SQL tool). In Development Credencials, use our new user and schema.

Setting up the connection to develop our DBT project on DBT Cloud

Click on Continue. In the next page, you can choose to use a repository on Github/Bitbucket/Gitlab. Or you can use a repository managed by DBT. We will use the latter option. So, choose a name, click Create and then Continue:

The next page allows you to invite people to work with you in the project. For now, you can just click on Skip & Complete.

We have our project created and DBT will show this page:

Click on Start developing and a page similar to an IDE will be opened. The first thing you need to do is to click on ‘initialize your project’. This will create the folders and files we need to run jobs on DBT.

The development page, after initialize the project

Now, delete the example folder, inside models. Add a new file in models, named user.sql. Use the same code we used on our EC2 instance.

The same model we created on our EC2 instance, now on DBT Cloud
select * from data_lake.user

Save the file. In the bottom of the page there is a field to run commands. Type ‘dbt run’ and click on Enter (you can see this field in the bottom of the image above) and you will have the following output:

Running the job on DBT Cloud IDE

And that’s how we develop our data transformation with DBT. Now, let’s say we want to use this code on a production environment. Still in the development page, click on Commit and add some message. Now, our code is in the master branch (remember you can use Github/Bitbucket/Gitlab and manage a repository and branches in a better way).

We could just use our database master user all the time, but to make this more real, in the previous part, we used an user with only permissions to read data from our source schema (data_lake) and with all privileges on its own schema, dbt_dev.

Now, to run our job in “production” environment, let’s create another user, with the necessary permissions. We need permissions to read data from the schema data_lake, just like the previous user. We also need permissions to create objects in schema data_warehouse (our models will be created there). Finally, since in part 2 we already created the data_warehose.user table we will delete it now, so DBT won’t have any problem recreating it. So, run the following SQL code, with the admin user:

create user prd_user with password '1234qwer';
grant all privileges on schema data_warehouse to prd_user;
grant usage on schema data_lake to prd_user;
grant select on all tables in schema data_lake to prd_user;
drop table data_warehouse.user;

Continuing, click on the menu on the top-left corner and select Environments, then click on New Environment. Choose a name for the environment and configure the connection using our new user, its password and the schema data_warehouse (remember, in this schema, we will create the objects with the results of our queries). Don’t change the other options. But notice that here DBT Cloud come in handy, since you can choose a custom branch and run jobs before you push your code to the master.

DBT Cloud makes easier to create a new environment, to work with branches

After you create the environment, you will see the environment page, with no job yet. Click on New Job.

Choose a name for the job. In the environment option, choose the only option you will have. In threads, I’m using 10, but that does not make difference right now. Don’t change the other options.

Creating our job

Scrolling down, we have the command we are going to run. Since our project only have one model, when we run DBT that is the only file that will be processed. But if you have a bigger project and want to run only part of your models, you can filter for path, tags and other options. Check the DBT documentation to explore this.

Another thing still in this page is the option to schedule the job. We are going to use Airflow to do this, so you can uncheck the option “Run on schedule?”.

You can schedule jobs directly with DBT Cloud, but only the DBT part of your pipeline

After this, click on Save, on the top of the page. In the next page, click on Run Now. You may have to wait a little to have your job running. DBT will reload the page from 10 in 10 seconds. After a while, you job will run and show the following result:

The job running on DBT Cloud (it has a few steps before dbt run)

The last of these steps was the dbt run command. Click on it and you will see that we have our already known output:

The same output of dbt run command

We have seen how to run DBT on our instance, how to use Airflow to schedule this and how to run DBT on DBT Cloud. The only part that is missing is how to use Airflow to run DBT Cloud jobs.

3. Using Airflow to run jobs on DBT Cloud

To run our jobs with Airflow, we are going to use the DBT Cloud API. So, all we need to do is to put the proper code on our DAG and do the same scheduling as we did before.

First, let’s get some data we need. In the last page we used, with our job, click on the job name.

The link to the job page, where we can find the job id

Now, from the URL on your browser, check the IDs of your account and of this job. For example, in my case, the URL was https://cloud.getdbt.com/#/accounts/19964/projects/32821/jobs/25507/. It means my account id is 19964 and my job id is 25507.

Now, go to the menu on the top-right corner, click on Profile. Then choose API Access. Then click on Show. That’s your API Key, which will be used to make requests to DBT API.

This is where you find your API Key

Moving forward, connect to your EC2 instance with the airflow user, using VSCode. Go to File > Open Folder. Choose airflow and you will have the path ‘/home/airflow/airflow’. Press Enter.

Right click on the dags folder and create a new file, named load_users_cloud.py:

You can use the interface of VSCode instead of the Terminal to create the project files

Now, copy the following code, using your own API key (instead of *****), account id and job id when needed.

from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
import datetime, jsondefault_args = {
'start_date': datetime.datetime(2021,1,1)
}
dbt_header = {
'Content-Type': 'application/json',
'Authorization': 'Token *****'
}
def getDbtMessage(message):
return {'cause': message}
def getDbtApiLink(jobId, accountId):
return 'accounts/{0}/jobs/{1}/run/'.format(accountId, jobId)
def getDbtApiOperator(task_id, jobId, message='Triggered by Airflow', accountId=19964):
return SimpleHttpOperator(
task_id=task_id,
method='POST',
data=json.dumps(getDbtMessage(message)),
http_conn_id='dbt_api',
endpoint=getDbtApiLink(jobId, accountId),
headers=dbt_header
)
with DAG('Our_medium_project',
schedule_interval="@daily",
default_args=default_args,
catchup=False) as dag:
load_user_cloud = getDbtApiOperator('load_users', 25507)

Here, we use the SimpleHttpOperator to make a POST request to the API. The function getDbtApiOperator simplifies how we create our operator. We just have to define a name for the task and the job id. We can also define a message to be showed on DBT for the job execution, though I’m using a default value, and the accountID, if we use more than one DBT Cloud account in a single DAG.

Save the file and open Airflow on your browser. Go to Admin > Connections. Click on + to add a new connection.

In Conn Id, use dbt_api (if you use something different, you have to switch the name on the code above as well). In Conn Type, choose HTTP. In Host, use “https://cloud.getdbt.com/api/v2/” (without quotes). Save the connection.

Setting up the connection to DBT API

Now, go to the DAGs page. Turn on the toggle for our new DAG. After a few seconds, you will see the dark green indicator, show our task was executed.

Our DAG running the DBT Job

Go back to DBT Cloud page, to the job page. You will see a new record on the job history, with the message we defined in our DAG.

The job history, executed manually and from Airflow

Okay, now we can use Airflow to run our DBT jobs, wether using DBT Cloud or DBT on a server. Although the example I used was pretty simple, the focus here was the integration between both tools.

If you already use DBT, you know how to work with your jobs, so you can simply include Airflow to schedule the jobs and integrate other steps not related to DBT, like move the data to the data warehouse.
If you already use Airflow and need to transform data within your data warehouse, you can use DBT to do this leveraging the references it creates between models, instead of running all the SQL commands with Airflow, creating temporary tables and consuming them.
Anyway, I hope this can be useful somehow.

Read all the parts of this article:

Part 1: Launching an instance and installing Airflow
Part 2: Installing DBT and some settings to make the work easier
Part 3: Using DBT Cloud and integrating Airflow with DBT

References:

https://www.getdbt.com/
https://airflow.apache.org/
https://docs.getdbt.com/dbt-cloud/api
https://docs.getdbt.com/reference/node-selection/syntax
https://airflow.apache.org/docs/apache-airflow-providers-http/stable/operators.html

Sources that helped me with the bases of most what I did here:
https://www.datascienceacademy.com.br (in portuguese)
https://docs.getdbt.com/docs/introduction

--

--