Introduction
How often is it that you’re writing a Data Pipeline and then you wish you could do something contingently? Something that only happens if a set of conditions are satisfied?
Hopefully, not that often! Airflow has supported this type of functionality via the AirflowBranchPython Operator. Many other workflow Orchestration tools have followed suit. Prefect have Conditional Flows, Dagster have DyanmicOutput, and in Orchestra we facilitate Branching based on status.
This leads us to the most important question: Why?
Why bother at all with branching, thereby making your pipeline more complicated than it needs to be. We’ll see there are actually some pretty incredible use-cases, especially for folks that are looking for a greater amount of automation in their lives.
A quick example of Branching in Airflow
Before diving in to use-cases, we’ll use the below code as a reference so we can understand how branching works in practice.
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime
def choose_branch(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='check_value')
if value > 10:
return 'path_a'
else:
return 'path_b'
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
dag = DAG('example_branching', default_args=default_args, schedule_interval='@daily')
start = DummyOperator(task_id='start', dag=dag)
check_value = PythonOperator(
task_id='check_value',
python_callable=lambda: 15, # Example condition value
dag=dag
)
branch_task = BranchPythonOperator(
task_id='branch_task',
provide_context=True,
python_callable=choose_branch,
dag=dag,
)
path_a = DummyOperator(task_id='path_a', dag=dag)
path_b = DummyOperator(task_id='path_b', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> check_value >> branch_task >> [path_a, path_b] >> end
The choose_branch
button function returns a different value depending on a task value that is stored in an xcom (a temporary data store for tasks).
The branch_task
is actually a separate task, that invokes a python callable (in this case the choose_branch
function). By specifying the variables path_a
and path_b
, and finally adding these as the possible outputs in array format to the branch_task, Airflow knows how to branch based on the branching logic.
Automating Model Training and Deployment
Branching is really powerful in the Machine Learning and Data Science world.
Suppose you have a Machine Learning model that needs to be trained every week, because every week you receive new data.
It doesn’t follow you’d want to deploy that model to Production every week as well. You would only want to do this, subject to rigorous testing that you would run post-training; at the very least, satisfying some accuracy benchmarks.
Data Dead Letter Queues
In software engineering more broadly and specifically in streaming and event-driven architecture we have the concept of dead letter queues.
These are stores of event data that cannot be processed.
In data, we don’t have the equivalent of this. Data that’s badly formatted or not of the right schema simply sits where it is, failing to be processed and potentially causing issues.
Branching can be used to identify files that fail to be processed. If data quality checks fail or a file fails to be processed, a branch can be introduced to write a log of the file path to a temporary table.
Before the next ingestion job, the data pipeline should check the temporary table for unprocessed files. By using branching to contingently store the file paths of files that aren’t processed, data pipelines can efficiently and incrementally process data even when failures occur (this is similar to the checkpoint concept in Spark Structured Streaming)
Dynamically changing Resource Allocation
How annoying is it when processes fail with an out of memory error?
Adding branching based on specific error messages can significantly improve the resilience of existing systems.
For example, your choose branch function could look like this:
def choose_branch(**kwargs):
error_message = kwargs['ti'].xcom_pull(task_ids='check_value')
if error_message = "Out of Memory Error":
return 'path_a'
else:
return 'path_b'
In the context of something like EC2, you could then stop the process and modify the instance before starting it again:
import boto3
ec2 = boto3.client('ec2')
instance_id = 'i-1234567890abcdef0'
new_instance_type = 't2.large'
# Stop the instance
ec2.stop_instances(InstanceIds=[instance_id])
waiter = ec2.get_waiter('instance_stopped')
waiter.wait(InstanceIds=[instance_id])
# Modify the instance type
ec2.modify_instance_attribute(InstanceId=instance_id, InstanceType={'Value': new_instance_type})
# Start the instance
ec2.start_instances(InstanceIds=[instance_id])
The overall DAG is like:
path_a = DummyOperator
path_b = RestartAndResizeEC2InstanceAndRunTask
run_task >> branch_task >> [path_a, path_b] >> end
For bonus points, you could even wrap this pattern in a python decorator to essentially implement automatic retries with conditional / branching instance management in the task itself!
Conclusion
Branching and Conditionality is a powerful feature of many Workflow Orchestration tools. With Machine Learning and Generative Ai are taking centre stage, data teams will increase focus on automating these processes.
Many of these processes require branching. Understanding model deployment and monitoring lifecycles is not deterministic, and branching is key to ensuring this process can be automated.
Similarly, machine learning models and fine-tuning jobs require various amounts of computation and resource. Based on the data received, Data Pipelines should be able to adapt in real-time to provision relevant services where auto-scaling is not an option.
At the very least, Data Teams should be aware of these features and how important they will become for Machine Learning and AI! 🚀