The world’s leading publication for data science, AI, and ML professionals.

How Fresh Are Your Data Sources?

Test the freshness of your tables and set up monitoring through Slac

k

Photo by Kaizen Nguyễn on Unsplash
Photo by Kaizen Nguyễn on Unsplash

The freshness of your data is an important aspect of data quality. If data isn’t up-to-date, then it is stale. If your business thinks it’s drawing insights from the most recent data, but really it’s looking at data that’s a month old, there’s going to be problems.

The insights wouldn’t be accurate.

Your data models are only as insightful as the data is fresh. So, how do you ensure your data is always being ingested in a timely matter?

My favorite way to do this is using dbt freshness tests for my data models, specifically tests on my source data. When you focus on the most upstream data sources, you catch the issue at the source rather than downstream. It is much easier to debug and can save hours of your time.

So, let’s discuss how we can set up freshness tests at the source and how we can send these messages to Slack.

Setting up dbt freshness tests

dbt makes it super simple to test any of your tables for freshness. While testing your models for freshness can be important, the most important thing is to make sure your raw data sources are being ingested each day. I’ve run into issues where my data connectors have shown that the data was successfully ingested but then the data is stale when I perform a manual check.

Setting up alerts will make it so you don’t have to manually check the latest dates in your data columns. dbt tests will give you a warning and then a failure based on the thresholds you set.

To set up dbt source freshness, you simply navigate to your src.yml file where you define your dbt sources. Similar to a description or columns block, you add a freshness block under the source you wish to test freshness for. You can add this block under the database which will apply the test to all tables in that database.

src.yml (image by author)
src.yml (image by author)

Or, if you don’t want the test to run for every table, you can define it directly under the table name.

src.yml (image by author)
src.yml (image by author)

You would also define it for each individual table if you want to use different warn_after and error_after thresholds.

warn_after specifies the number of days that can pass before you will get a warning when running your dbt test. This warning won’t error out but will let you know to look into the freshness of the table before it does error out.

error_after specifies the number of days that can pass before you get a failure. If you are running these tests in a data pipeline, this failure would cause your pipeline to fail.

Ideally, we want to know when dbt tests are warning us about the freshness of our tables. This way we can act on it before our tables fail. There is no "easy" way to do this through dbt, unless you are running your models using dbt Cloud.

Lastly, you also need to include a loaded_at_field block at the same indentation as the freshness block. This tells dbt which date to compare to the date the test is being run. If you are using Fivetran to ingest your data, you would want to use the _FIVETRAN_SYNCED column. This is just the date that the data connector last ingested data into the warehouse.

Sending Slack alerts using Prefect

If you aren’t familiar with Prefect, it is a data orchestration tool that utilizes Python in writing data pipelines. I run my dbt models and any command in a dbt_task() function built within Prefect. My freshness tests get ran like this:

dbt_source_freshness = dbt_task(command='dbt source freshness', task_args={"name": "dbt tests: source freshness"})

I also set upstream dependencies for these tasks as any data syncs. This way the freshness test won’t run until after all of my data connectors have been synced by Fivetran. Setting dependencies within Prefect looks like this:

dbt_source_freshness.set_upstream(shopify_sync)

In order to send a Slack message, you need to create your own Python function. Prefect makes it easy by having a simple one already built out for you.

@task(trigger=all_finished, checkpoint=False, log_stdout=True)
def send_slack_alert_on_test(output):
    SlackTask(message=output).run()

This function will be triggered when all of the upstream dependencies have finished running as specified by trigger=all_finished. There are other trigger options for this as well depending on your use case. This function will send the output of your input function as a Slack message to the channel that you specify. You can read more about setting up a Prefect webhook here.

In order for this function to fit my use case, I had to customize it a bit. First, I wanted to reiterate through each line of the output rather than send the entire output as a message. I also wanted to only send the warning messages rather than ones that have passed and failed. In order to do this, I iterated through each line of the message and only sent the ones that contained "WARN freshness".

@task(trigger=all_finished, checkpoint=False, log_stdout=True)
def send_slack_alert_on_test(output):
    for line in output:
        if "WARN freshness" in line:
            SlackTask(message=line).run()

I didn’t want to send the "FAIL freshness" messages because if the test failed, the Prefect task would error out anyways and I already receive a different notification for that.

I did run into one issue that is important to keep in mind. If you don’t specify return_all=True in your DbtShellTask(), only the last line is returned as the output of the task. The last line looks like this:

Image by author
Image by author

You want the output to look something like this so you can iterate through each line:

Image by author
Image by author

With this added your shell task will look like this:

dbt_task = DbtShellTask(profile_name='madison',
                        log_stdout=True,
                        profiles_dir=DBT_REPO_PATH,
                        environment='prod',
                        return_all=True)

Now you are ready to deploy your dbt freshness tests and Slack alert using Prefect!

Conclusion

Setting monitoring for your data tables is one of the first steps to creating a reliable data stack. It will help you fight fires before they break out, being proactive rather than reactive. If you think everything is always going to go according to plan, you are wrong. You need to put tools in place that will set you up for success, even when you expect the worse.

This is a great solution for those who utilize dbt and don’t have a sophisticated data quality or monitoring platform within your data stack. Anyone can set up these tests, as long as they’re already using dbt. Steps like these require little work up front but will save you a lot of work in the long run.

For more tips on using dbt and improving your data stack, subscribe to my Learn Analytics Engineering newsletter.

Read more about data ingestion best practices.

Learn to set up data volume alerts using a free Dbt package.


Related Articles