
In this story, I would like to raise a discussion on how we transform data. Whether it’s a database, data warehouse or reporting solution we run data transformations based on data models but how do we organise them? I would like to talk about the modern data transformation tools you use. We will touch on some nuances of the modular approach, scheduling and data transformation tests. At the end of this article, I will provide an example application to run data modelling tasks with data lineage and self-documenting features. I’m very keen to know what you think about it.
I witnessed dozens of various ways to run data transformations. Throughout my more than fifteen-year career in Big Data and analytics, I built data pipelines with different design patterns and I’m sure there are more. That’s why I like the technology world so much. The multitude of possibilities it offers is simply amazing.
Which operating system do you use for your data warehouse?
Modern data transformation tools
Modern data transformation tools also known as data modelling tools or data warehouse (DWH) operating systems were designed to simplify SQL data manipulation tasks to create datasets, views and tables. Often they use SQL-like dialect to run any possible data definitions (DDL) and manipulations (DML) we might need including data transformation tests and custom dataset creation in development mode.
The abundance of ANSI-SQL data warehouse solutions in the market makes these tools extremely useful. For instance, consider this list of dbt adaptors below. All market leaders are present there.

dbt stands for database build tool and it is essentially a scheduler application that can be run locally or on the server to run data transformation tasks. For example, consider this simple model below. It creates a view in our database and we can materialise it let’s say every 5 minutes to preserve the data for analytics. At the top of the file we have some metadata config that dbt will use while running the task in the relevant database and the relevant schema, i.e. materialized='view'
tells dbt that it will be a materialised view.
-- ./models/example/table_a.sql
{{ config(
materialized='view',
tags=["finance"]
) }}
with source_data as (
select 1 as id
union all
select null as id
)
select *
from source_data
Having this file we can simply run the scheduler from our command line like so:
dbt run --select tag:finance
19:27:51 Running with dbt=1.7.7
19:28:02 Registered adapter: snowflake=1.7.1
19:28:04 Found 2 models, 4 tests, 0 sources, 0 exposures, 0 metrics, 430 macros, 0 groups, 0 semantic models
19:28:04
19:28:12 Concurrency: 1 threads (target='dev')
19:28:12
19:28:12 1 of 1 START sql view model events.table_a ..................................... [RUN]
19:28:16 1 of 1 OK created sql view model events.table_a ................................ [SUCCESS 1 in 3.98s]
19:28:19
19:28:19 Finished running 1view models in 0 hours 0 minutes and 15.42 seconds (15.42s).
19:28:19
19:28:19 Completed successfully
19:28:19
19:28:19 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
Of course there is a lot more in it about the actual setup and project settings.
Essentially they are just schedulers with templating capabilities
However, this is a rough idea of what this type of tool does with the data we load into our database. Data loading is a completely different topic though and I previously wrote about it here [1].
Here is another example of a similar tool called Dataform. It creates a view too using its .sqlx templating format. It will also use some metadata to tell the system what to do with the file, i.e. which schema to use, description, tags, etc.
--./definitions/test_v.sqlx
config {
type: "view",
schema: "analytics",
disabled: false,
tags: ["test", "view"],
description: "A view with ."
}
select 2+3 as v
So these tools offer an extensive list of improvements towards a standard SQL way of running things. This would include the following:
- SQL templates – we can use the same one in different data environments
- Metadata – useful template settings, i.e. tags and whatever else we might want to apply to our database object or a task we run.
- Scheduling capabilities – i.e. run some table updates every 15 minutes
- SQL transformation unit tests – one of my personal favourites. Not too many companies actually test their analytics queries but I think this is a must.
- Schema customizations – very useful for development mode
- Data lineage and self-generating documentation.
How to use templates
So we can see that SQL templates are an essential part of any data transformation tool. It aims to organise our SQL queries as modules and reuse them in different environments or databases following the DRY methodology.
DRY – do not repeat yourself
Let’s go back to our first "dbt" example of a view. You probably noticed that we didn’t mention any schema or database in that view definition. So this is how these tools work – they apply these things using config files! For instance, depending on our target environment this view table_a can be created either in the staging database or in the production one following our data warehouse design pattern [2]. This is a very useful and powerful technique as we can run our SQL data transformation development in a completely isolated development (staging) branch. So for instance, if we built our DWH using two main database prefixes (_dev and _prod) our view will be created only in staging until we promote it to production.

Applying environment prefixes would result in this DWH structure:

DATABASE_NAME SCHEMA_NAME
-------------------------------
RAW_DEV SERVER_DB_1 -- mocked data
RAW_DEV SERVER_DB_2 -- mocked data
RAW_DEV EVENTS -- mocked data
RAW_PROD SERVER_DB_1 -- real production data from pipelines
RAW_PROD SERVER_DB_2 -- real production data from pipelines
RAW_PROD EVENTS -- real production data from pipelines
...
BASE_PROD EVENTS -- enriched data
BASE_DEV EVENTS -- enriched data
...
ANALYTICS_PROD REPORTING -- materialised queries and aggregates
ANALYTICS_DEV REPORTING
ANALYTICS_PROD AD_HOC -- ad-hoc queries and views
So by using this one simple template, we can create views in different databases which makes it an ideal solution for the development/production environment split. This topic would typically involve SQL automated testing using CI/CD techniques too. Some very detailed examples can be found in one of my stories here [3].
Unit Tests for SQL Scripts with Dependencies in Dataform
Data modelling or data transformation?
These concepts are closely related as any data transformation tool would require a model as an abstract to run and produce (or materialise) some data. Any data model can be considered as a combination of dimensions and measures and we usually describe this combination using SQL. For instance, in dbt, it is actually called models. Consider this sample dbt project folder structure below. We can see that our view lives in that model’s folder and the model is called "example". It has a schema definition provided schema.yml file which would tell how objects, tables and views relate to each other.
.
├── README.md
├── analyses
├── dbt_project.yml
├── logs
│ └── dbt.log
├── macros
├── models
│ └── example
│ ├── schema.yml
│ ├── table_a.sql
│ └── table_b.sql
├── profiles.yml
├── seeds
├── snapshots
├── target
│ ├── compiled
│ ├── graph.gpickle
│ ├── graph_summary.json
│ ├── manifest.json
│ ├── partial_parse.msgpack
│ ├── run
│ ├── run_results.json
│ └── semantic_manifest.json
└── tests
If we choose to add another view in this folder that would depend on our first view we can use templates to describe that dependency. For example, consider this sql from table_b.sql below. We will use a ref function to tell our system that table_a is a dependency for this view or table.
{{ config(
tags=["example"]
) }}
select *
from {{ ref('table_a') }}
where id = 1
Now if we run dbt docs generate
in our command line dbt will generate documentation with a data lineage graph for our project. dbt docs serve
would actually serve the documentation website:
11:29:17 Running with dbt=1.7.7
11:29:17 Registered adapter: snowflake=1.7.1
11:29:17 Found 2 models, 4 tests, 0 sources, 0 exposures, 0 metrics, 430 macros, 0 groups, 0 semantic models
11:29:17
11:29:18 Concurrency: 1 threads (target='dev')
11:29:18
11:29:18 Building catalog
11:29:22 Catalog written to ...

I think this is one of my favourite features of modern data transformation tools. It helps a lot with documentation and often serves as a single point of truth for everything we do in our data warehouse. Data transformation tools make dependency graphs auto-generated and available for internal users as data catalogues. This is a very powerful feature which aims to improve transparency in your data team, in what you do and explain it to anyone with ease. One dependency graph where can search for a table name is worth a thousand of docs! I think this is what the whole modern Data Engineering is about [4]

How to schedule table updates
Let’s imagine we have a table that we want to update with new data every 15 minutes. We can use MERGE statement in SQL defined in one of our templates like so:
config {
type: "operations",
hasOutput: true,
schema: "production",
disabled: false,
name: "user_reputation",
dependencies: ["reputation_data", "reputation_data_v"],
tags: ["user_reputation"],
description: "user_reputation based on reputation_data from firehose"
}
create table if not exists ${self()} (
user_id int64
,updated_at timestamp
)
PARTITION BY DATE(updated_at)
;
merge ${self()} t
using (
select
user_id
, latest_updated_at
from
(
select
user_id
, max(updated_at) as latest_updated_at
from
${ref("reputation_data_v")}
group by
user_id
) y
) s
on t.user_id = s.user_id
when matched then
update set updated_at = s.latest_updated_at, user_id = s.user_id
when not matched then
insert (updated_at, user_id) values (latest_updated_at, user_id)
;
This is a Dataform incremental table update I wrote once [5] for one of my tables but how do we schedule it?
Tools like Dataform and dbt are schedulers in a nutshell and we can use their built-in features to schedule something by simply defining it in their configuration files. For instance, in Dataform we would want to create a "cron": "15 * * * *"
schedule in file called environments.json
:
{
"environments": [
{
"name": "test",
"configOverride": {
"defaultSchema": "analytics",
"defaultDatabase": "mydwh-data-tests"
},
"schedules": [
{
"name": "test_workflow_id",
"cron": "00 07 * * *",
"tags": [
"test",
"test"
],
"options": {
"includeDependencies": false
},
"disabled": true,
"gcp": true,
"notify": [
{
"channel": "dataTeamEmails",
"statuses": [
"FAILURE"
]
}
]
}
],
"gitRef": "master"
},
{
"name": "production",
"configOverride": {
"defaultSchema": "analytics",
"defaultDatabase": "mydwh-data"
},
"schedules": [...]
},
{
"name": "staging",
"configOverride": {
"defaultSchema": "analytics",
"defaultDatabase": "mydwh-data"
},
"schedules": [...]
}
]
}
Actually this is a great example of data environment setup as we can see different environments the scheduler will use to run SQL scripts, git branches referring to each and possible config overrides that might e required. In dbt we can schedule runs using dbt cloud and it’s a managed service where we create schedules using UI but also can use API to send requests for schedule updates.
Not just tables…
This way we can schedule create and update operations for any instance of our data warehouse in any environment. [6]
Easy way to create Live and Staging environments for your Data
Ok, that’s great but what if we want to create our own bespoke scheduler application to run data modelling tasks? There are a few ways we can do it. One way would be to use existing dbt or Dataform libraries to build an API service that can be triggered by an API call. Then it would run through configuration files, pick a schedule and run it. We can even do it using serverless using Cloud Functions or AWS Lambda.
dbt example with AWS Lambda and Terraform
This is just an example of how to do it using AWS Lambda and the dbt-core Python library. One thing to keep in mind is that dbt will be generating logs along the execution and we need our dbt project to reside in the /tmp/
Lambda environment folder – which is the only writable place in Lambdas.
Yup, that’s right. AWS Lambda and dbt!
Another thing to consider is that dbt-core use multi-threading [7] which is not supported in Lambdas so we will need to patch it in our code limiting the task runner to a single thread. I will use Snowflake as a DWH but this approach will work with anything else, i.e. BigQuery, Redshift, etc. After we create a Docker image we can deploy it using Terraform [8].
Our dbt-runner project structure will be the following:
.
├── dbt-runner
│ ├── Dockerfile
│ ├── app.py -- Lambda handler
│ ├── auth.py
│ ├── config
│ ├── dbt_env
│ ├── dbt_packages
│ ├── env_dev.json
│ ├── env_prod.json
│ ├── event.json
│ ├── lambda_config.py
│ ├── package
│ ├── my_snowflake_dwh -- DBT project folder
│ ├── requirements.txt
│ ├── test
│ └── tmp
├── readme.md
└── terraform
├── environment
└── module
The libraries we need are in the requirements.txt
:
dbt-core==1.7.7
dbt-snowflake==1.7.1
python-lambda-local==0.1.13
# boto3==1.34.29
dbt-extractor==0.5.1
dbt-semantic-interfaces==0.4.3
pyyaml==6.0
pytest==8.0.0
Our main application file app.py will look like this:
"""Main app to run data transformations using dbt-core
Can be deployed as AWS Lambda
Can be triggered by any event with cli_args
Can be run locally using event payload from event.json
"""
# Patch multi-threading for Lambda
import queue
import threading
from concurrent.futures import ThreadPoolExecutor
import dbt.flags as dbt_flags
# Override multiprocessing ThreadPool with
# a ThreadPoolExecutor that doesnt use any
# shared memory semaphore locks
class CustomThreadPool:
def __init__(self, num_threads):
self.pool = ThreadPoolExecutor(max_workers=num_threads)
# provide the same interface expected by dbt.task.runnable
def apply_async(self, func, args, callback):
def future_callback(fut):
return callback(fut.result())
self.pool.submit(func, *args).add_done_callback(future_callback)
# we would need to actually keep a "closed"
# attribute lying around and properly check it
def close(self):
pass
# shutdown(wait=True) mimics "join", whereas
# shutdown(wait=False) mimics "terminate"
def join(self):
self.pool.shutdown(wait=True)
import multiprocessing.dummy
multiprocessing.dummy.Pool = CustomThreadPool
# Replace Multiprocessing context with threaded context
# The objects mostly have the same api
class ThreadedContext:
Process = threading.Thread
Lock = threading.Lock
RLock = threading.RLock
Queue = queue.Queue
def get_threaded_context():
return ThreadedContext()
# override both just in case :)
dbt_flags._get_context = get_threaded_context
dbt_flags.MP_CONTEXT = ThreadedContext()
# Third-party dependencies
import os
import logging
from dbt.cli.main import dbtRunner, dbtRunnerResult
# from dbt.logger import log_manager
from distutils.dir_util import copy_tree
# Local dependencies
from auth import get_private_key
from lambda_config import get_work_dir, get_settings
# Environment varibales
ENV = os.environ['ENV']
# provide any other env variables you might need
# ...
def lambda_handler(event, context):
try:
# Copy dbt project folder to /tmp/ Lambda folder to make it writable:
copy_tree('./my_snowflake_dwh', '/tmp/my_snowflake_dwh')
# Get credentials
creds = get_creds(ENV)
# Set dbt secrets defined as secret variables in dbt ./profiles.yml
# i.e.
# profile:
# target: dev
# outputs:
# dev:
# type: snowflake
# ....
#
# user: "{{ env_var('DBT_USER') }}"
# password: "{{ env_var('DBT_PASSWORD') }}"
# ....
os.environ['DBT_USER'] = creds['user']
os.environ['DBT_PASSWORD'] = creds['password']
# Tell dbt what to run using event payload
# create CLI args as a list of strings
env_arg = [f"-t{ENV}"] # environment
# cli_args = ["run", "--select", tags] + env_arg
cli_args = event['cli_args'] + env_arg
print(f'cli_args= {cli_args}')
# Init dbtRunner
dbt = dbtRunner()
# Run
# inspect the results
res: dbtRunnerResult = dbt.invoke(cli_args)
for r in res.result:
print(f"{r.node.database}.{r.node.schema}.{r.node.name}: {r.status}") # noqa E501
except Exception as e:
print(e)
return {
'statusCode': 500,
'body': {'result': e}
}
message = 'Successfully ran dbt models in {}.'.format(ENV)
return {
'statusCode': 200,
'body': {'result': message}
}
Now we would want to build an image and push it to our AWS ECR repository. We can use a shell script for that:
REPO_NAME=dbt-runner-staging
AWS_ACCOUNT=0123456789 # you aws account number
SERVICE_NAME_1=dbt-runner
# Get date and time to create unique s3-key for deployment package:
date
TIME=`date +"%Y%m%d%H%M%S"`
# cd ./stack
cd ./dbt-runner
pwd
base=${PWD##*/}
zp=$base".zip"
echo $zp
rm -f $zp
# Install virtualenv and requirements
pip3 install virtualenv==20.17.1
virtualenv dbt_env -p python3.9
source dbt_env/bin/activate
pip3 install -r requirements.txt
echo "VIRTUAL ENV:" $VIRTUAL_ENV
## Alternatively use this (preferable in Python 3):
# cd ./stack
# python3 -m venv stack_env
# source stack_env/bin/activate
# echo "VIRTUAL ENV:" $VIRTUAL_ENV
# source stack_env/bin/activate
# pip install -r stack_requirements.txt
# Login to ECR
aws ecr get-login-password
--region eu-west-1
| docker login
--username AWS
--password-stdin $AWS_ACCOUNT.dkr.ecr.eu-west-1.amazonaws.com
# Create new ecr repo if not exists
ECR_REPO=$(aws ecr describe-repositories --repository-names ${REPO_NAME} --output text)
if [[ $? -eq 254 ]]; then
echo "Creating ECR repository ..."
CREATE_REPO=$(aws ecr create-repository --repository-name ${REPO_NAME} --image-scanning-configuration scanOnPush=true --image-tag-mutability MUTABLE --output text)
echo ${CREATE_REPO}
fi
# 0123456789.dkr.ecr.eu-west-1.amazonaws.com/dbt-runner-staging
# build
docker build --no-cache --platform=linux/amd64 $base .
# # for M1 chip users:
# docker buildx build --platform linux/amd64 -f ./Dockerfile -t $base .
# # keep in mind that AWS Lambda architecture must be set to architectures = ["x86_64"]
# tag and push
docker tag ${base}:latest ${AWS_ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/${REPO_NAME}:${TIME}
docker push ${AWS_ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/${REPO_NAME}:${TIME}
# Use this as an image URI
echo ${base}:latest ${AWS_ACCOUNT}.dkr.ecr.eu-west-1.amazonaws.com/${REPO_NAME}:${TIME}
Now all we need is to deploy the solution using this Docker image and AWS Lambda. Our Terraform Lambda module can look like this:
resource "aws_lambda_function" "dbt_runner" {
depends_on = [
null_resource.ecr_image # create using .sh script above
]
function_name = "dbt-runner-${var.env}"
timeout = 60
runtime = "python3.9"
architectures = ["x86_64"]
image_uri = "${aws_ecr_repository.repo.repository_url}@${data.aws_ecr_image.lambda_image.id}"
package_type = "Image"
role = aws_iam_role.lambda_exec.arn
environment {
variables = {
ENV = var.env
DBT_PROFILES_DIR = "/tmp/my_snowflake_dwh/"
DBT_PROJECT_DIR = "/tmp/my_snowflake_dwh/"
}
}
}
Now we can invoke our Lambda function with an event providing cli_args
for dbt to run in event.json
:
{
"cli_args": ["run", "--select", "tag:finance tag:example"]
}
Invoke:
aws lambda invoke
--region=eu-west-1
--function-name dbt-runner-dev out
--cli-binary-format raw-in-base64-out
--payload file://dbt-runner/event.json
--log-type Tail
--query 'LogResult' --output text | base64 -d

Conclusion
There are various ways to run data models in the database, data warehouse or BI tool. Being usually done using standard SQL and cron jobs is often considered as not the best practice. Modern data transformation tools on the other hand offer a vast range of improvements compared to conventional SQL techniques. this would include SQL templates, unit tests, custom schema suffixes, data lineage and self-generated documentation. Dependency graphs and data lineage are my favourite features as they help a lot with documentation and often serve as a single point of truth for everything we do in our data warehouse. It simply makes things clearer.
Having said this, modern data transformation tools can offer a simple, yet extremely reliable way to test and transform data using a modular approach for each data environment. Data can be managed and transformed in many ways and using many tools. Which one to use – depends on your stack and business requirements. I witnessed a great number of various use cases and there can be even more. That’s why I like the technology. The multitude of possibilities it offers is simply amazing.
Recommended read
[1] https://medium.com/towards-data-science/advanced-etl-techniques-for-beginners-03c404f0f0ac
[2] https://medium.com/towards-data-science/data-warehouse-design-patterns-d7c1c140c18b
[3] https://towardsdatascience.com/unit-tests-for-sql-scripts-with-dependencies-in-dataform-847133b803b7
[4] https://medium.com/towards-data-science/modern-data-engineering-e202776fb9a9
[5] https://towardsdatascience.com/advanced-sql-techniques-for-beginners-211851a28488
[7] https://stackoverflow.com/questions/71255224/how-to-run-dbt-in-aws-lambda
[8] https://medium.com/towards-data-science/a-guide-to-data-engineering-infrastructure-cb074e0d3f99