The world’s leading publication for data science, AI, and ML professionals.

Mastering ExternalTaskSensor in Apache Airflow: How to Calculate Execution Delta

External Task Sensors stop bad data from trickling downstream in a data pipeline. Leverage them to create a reliable data infrastructure.

External Task Sensors are like gatekeepers - they stop bad data from trickling downstream. Image by Freepik.
External Task Sensors are like gatekeepers – they stop bad data from trickling downstream. Image by Freepik.

Orchestrating a data pipeline is a delicate endeavor. In a data pipeline, we can have thousands of tasks running simultaneously and they are often dependent on one another. If we’re not careful, a single point of failure can have a domino-like effect that trickles downstream and mess up the whole pipeline.

Apache Airflow introduced the External Task Sensor to put an end to these issues. While it is an extremely powerful feature, it also comes with some degree of complexity.

In this introductory piece, I hope to untangle some of the confusion surrounding the External Task Sensor and show how we can use it to enhance the reliability of our data pipelines – making sense of sensors!

Why do we need External Task Sensor?

Meet Jamie, a rookie chef at Airflow Bakery. She’s new. Her only responsibility is to make a new batch of cookie dough every hour.

Jamie's responsibilities as shown in a "DAG" format. Chef (F) icon by Freepik.
Jamie’s responsibilities as shown in a "DAG" format. Chef (F) icon by Freepik.

And then we have Gordon Damnsie, the cookie master. Gordon takes the dough from Jamie and turns them into award-winning cookies.

Gordon's responsibilities as shown in a "DAG" format. Chef (M) icon by Freepik.
Gordon’s responsibilities as shown in a "DAG" format. Chef (M) icon by Freepik.

One fine day, Gordon swoops in to grab the freshest dough he can find and bakes cookies. But when he takes a bite, yuck! "Bad" would’ve been an understatement. Gordon quickly discovers the root cause was the stale dough, which was left over from a week ago.

Gordon, visibly frustrated, tosses the cookies into the bin. After he composes himself, he slowly turns to Jamie and asks, "Why is the dough not fresh?"

"I had to stop making them, Chef. There was a problem with the raw ingredients," Jamie replies, trying to stay calm in the face of Gordon’s anger. Unfortunately, the bad cookies had already been served to clients and they no longer trust the food quality of the bakery.

This slight detour is a cautionary tale on the importance of validating the freshness of data sources. In the story, Gordon’s success is dependent on Jamie, but they are working independently without communicating with each other. They "trust" that the other person will do their job flawlessly. But as any data practitioner will know, everything that can go wrong will go wrong in a data pipeline.

Ideally, Gordon should check with Jamie whether she made dough recently. Once he has confirmed, it means that the dough is fresh so he can proceed to bake his cookies. Otherwise, stop baking and figure out what went wrong.

You see, what Gordon needs… is an external task sensor.

What does External Task Sensor do?

An external task sensor checks whether other people completed their assigned task. It senses the completion of an external task, hence the name.

In the context of Airflow, Jamie and Gordon are DAGs. They have specific tasks that they need to complete.

When we add an External Task Sensor, it becomes the middleman that coordinates between the two independent DAGs. The sensor will check on Jamie at a specific time to see if she has completed her task.

If Jamie successfully completes her task, the sensor will inform Gordon so that he can carry on with his downstream tasks.

The external task sensor - check_dough() returns as a success after verifying that make_dough() ran successfully. Chef (F) and Chef (M) icons by Freepik.
The external task sensor – check_dough() returns as a success after verifying that make_dough() ran successfully. Chef (F) and Chef (M) icons by Freepik.

If Jamie fails to complete her task, the sensor stops Gordon from doing any tasks that have a dependency on the failed task.

The external task sensor - check_dough() returns as a fail after verifying that make_dough() did not run successfully. Chef (F) and Chef (M) icons by Freepik.
The external task sensor – check_dough() returns as a fail after verifying that make_dough() did not run successfully. Chef (F) and Chef (M) icons by Freepik.

Having this additional layer of validation essentially stops stale data from trickling further downstream and polluting the rest of our pipeline with dirty, inaccurate data.

How do we create External Task Sensor?

Airflow makes it very easy to create an External Task Sensor – just import them. The syntax will look something like this:

from airflow.sensors.external_task import ExternalTaskSensor

ext_task_sensor = ExternalTaskSensor(
    dag=gordon_tasks,
    task_id='check_dough_freshness',
    external_dag_id='jamie_tasks',
    external_task_id='make_new_dough',
    email=['[email protected]', '[email protected]'],
    execution_delta=timedelta(minutes=30),
    # execution_date_fn=my_function,
    timeout=1800,
    poke_interval=300,
    mode='reschedule'
)

Here’s what they mean:

  1. dag is the current DAG object. Since Gordon is the one who wants to check whether Jamie made dough, this should point to Gordon’s DAG.
  2. task_id is the unique name for this External Task Sensor.
  3. external_dag_id is the name of the DAG you want to check. In this case, Jamie’s DAG.
  4. external_task_id is the name of the specific task you want to check. Ideally, we should always specify this. Otherwise, the sensor will check for the completion of the entire DAG instead of just one specific task. In other words, Gordon will do nothing until Jamie finishes chopping onions, washing dishes, and restocking the pantry, even though we only want to know whether she made dough. Or worse, if any one of these irrelevant tasks fails, the sensor will unnecessarily pause the entire pipeline.
  5. email is the list of people you want Airflow to notify when the External Task Sensor fails. Keep in mind that for this to work, you need to have the SMTP settings properly configured in the Airflow configuration file.
  6. execution_delta is arguably the most confusing part about External Task Sensors but also the most important. So, I’m dedicating an entire section to it below. Keep scrolling!
  7. execution_date_fn and execution delta are very similar. We can only use one of them at a time. Sometimes it’s easier to use this rather than execution delta. I’m also giving this its own section below.
  8. timeout limits how long a sensor can stay alive. When we create a sensor, it consumes resources by occupying one worker slot. If the target task never completes, these sensors will keep checking indefinitely while hogging the worker slot. Over time, we can run into a Sensor Deadlock, where all worker slots become occupied by useless sensors and no tasks can run anymore. Therefore, it’s best practice to set a maximum time limit for the checks.
  9. poke_interval is the duration before the sensor checks again if the previous check fails. The rationale is that we don’t want the sensor to check excessively like a madman, as it adds unnecessary loads to the server. On the flip side, checking too infrequently means the sensor will wait longer than necessary, delaying the pipeline. The trick is to find the sweet spot based on the expected run time of the external task.
  10. mode is how we want the sensor to behave. It can be set to "poke" or "reschedule". When set to "poke", the sensor goes to sleep on failure and wakes up on the next poke interval to try again. It’s like being on standby mode. The sensor will be more reactive, but since it’s on standby, the worker slot remains occupied throughout the whole process. When set to "reschedule", the sensor will check once. If the check fails, the sensor will schedule another check at a later time but terminates itself for now, freeing up the worker slot. Airflow recommends using "reschedule" if the poke interval is greater than 60 seconds.

Alright, that’s just about every parameter we need to know about External Task Sensor. Granted that this list is not exhaustive, knowing these 10 parameters will be more than enough for us to set up our External Task Sensor properly for virtually all use cases.

For completeness’ sake, I’ll include Airflow’s official documentation for those who are eager to explore it in more detail.

What are Execution Delta and Execution Date Function?

In the section above, I’ve glossed over these two parameters because they are arguably the most notorious, annoying, and confusing part of external task sensors. But I think it’s time we tackle them.

So what are execution_delta and execution_date_fn?

Building on our analogy, external_task_id tells the sensor to check if Jamie completed the make_dough() task. But she makes a lot of dough – once every hour. Are we checking if she baked in the past hour, yesterday, or last week?

This ambiguity confuses External Task Sensors and that’s why Airflow came up with two ways for us to communicate this information. Both execution_delta and execution_date_fn are meant to tell sensors the specific time of the task.

  1. execution_delta expresses time on a relative basis, e.g.: "Did Jamie bake 30 minutes ago?" It accepts a datetime.timedelta object as its argument, e.g: datetime.timedelta(minutes=30).
  2. execution_date_fn expresses time on an absolute basis, e.g.: "Did Jamie bake on the 3rd May 2023 at 4.30 pm?" It accepts a callable Python function as its argument. This function should return the execution date of the task that we want to check on, e.g: datetime.datetime(year=2023,month=5,day=3,hour=4,minute=30).

Since both of them convey the same information, Airflow only allows us to use one or the other, but not both at the same time.

I generally use execution_delta as the de-facto choice. But, there are scenarios where it’s too complicated to calculate the execution_delta. In that case, I’d use execution_date_fn instead.

How to calculate execution_delta?

The word, execution_delta, is short for delta (a.k.a difference) __ of _execution date_s (a.k.a the previous runtime of our tasks).

The formula for execution_delta. Image by author.
The formula for execution_delta. Image by author.

I’d like to highlight the keyword here – "previous".

Some of you may be wondering… Why does Airflow want the time difference of previous runs, but not the current runs? This used to confuse the crap out of me when I first started using Airflow.

Turns out there is a perfectly good reason. However, I don’t want to derail from the topic at hand so I will include it in the later section (here). For now, let’s just accept the formula as-is and see how we would apply this.

Suppose that Jamie makes dough every hour (e.g: 13:00 pm, 14:00 pm, 15:00 pm, …). Gordon also makes cookies every hour, but he makes them at the 30th minute of every hour (e.g: 13:30 pm, 14:30 pm, 15:30 pm, …).

At 14:30 pm sharp, Gordon gets ready to bake his cookie. Before he starts, he would need to check if Jamie made fresh dough recently. The latest run for make_dough() would be 14:00 pm.

This time series shows the task dependencies between Jamie and Gordon. Gordon always checks whether Jamie completed her task half an hour ago. Chef (F) and Chef (M) icons by Freepik.
This time series shows the task dependencies between Jamie and Gordon. Gordon always checks whether Jamie completed her task half an hour ago. Chef (F) and Chef (M) icons by Freepik.

Given that both Gordon and Jamie’s tasks are scheduled hourly, their execution date (a.k.a previous runs) for the 14:30 pm run would be…

  • Gordon’s execution date = 14:30 pm – 1 hour = 13:30 pm
  • Jamie’s execution date = 14:00 pm – 1 hour = 13:00 pm

We can plug these values into the formula, and voilà!

The execution_delta comes out to be datetime.timedelta(minute=30) for one specific run. Image by author.
The execution_delta comes out to be datetime.timedelta(minute=30) for one specific run. Image by author.

You can do the same calculation for different runs of the tasks to get their respective execution_delta.

When calculating execution delta, it's helpful to lay them out in a format like this. We want to calculate the execution deltas for multiple runs, not just one, in order to make sure they're all the same! Image by author.
When calculating execution delta, it’s helpful to lay them out in a format like this. We want to calculate the execution deltas for multiple runs, not just one, in order to make sure they’re all the same! Image by author.

In this (cherry-picked) example, all of the execution_delta turns out to be exactly the same. We can pass this to our External Task Sensor and everything will work.

from airflow.sensors.external_task import ExternalTaskSensor

ext_task_sensor = ExternalTaskSensor(
    dag=gordon_tasks,
    task_id='check_dough_freshness',
    external_dag_id='jamie_tasks',
    external_task_id='make_new_dough',
    email=['[email protected]', '[email protected]'],
    execution_delta=timedelta(minutes=30),  # Pass the execution delta here
    timeout=1800,
    poke_interval=300,
    mode='reschedule'
)

But-!

The execution_delta can be different sometimes. This usually happens when the schedule intervals of the two dags are different (e.g.: daily vs weekly, daily vs monthly, …).

For example, let’s say that Jamie makes her dough weekly on Sunday at 14:00 pm, but Gordon makes his cookies daily at 14:30 pm.

The arrow between Jamie's task and Gordon's sensor represents the execution delta. The execution delta gets longer over the week until it resets again on Sunday. Chef (F) and Chef (M) by Freepik.
The arrow between Jamie’s task and Gordon’s sensor represents the execution delta. The execution delta gets longer over the week until it resets again on Sunday. Chef (F) and Chef (M) by Freepik.

If we do the same calculations, you will see that the execution deltas differ for every run.

Note that execution deltas can vary for different runs. Image by author.
Note that execution deltas can vary for different runs. Image by author.

This becomes a problem because execution_delta only accepts a single datetime object as its argument. We can’t input a different value of execution_delta for every run.

In cases like this, we need execution_date_fn.

How to calculate Execution Date Function?

The execution_date_fn is just a regular Python function. As with all Python functions, it takes some argument(s) and returns some output(s). But the beauty of using a function is the ability to return a different output based on the function’s inputs and logic.

In the case of execution_date_fn, Airflow passes the current task’s execution date as an argument and expects the function to return the external task’s execution date. Note that these execution dates need to be expressed in UTC time.

def my_exec_date_fn(gordon_exec_date):
    # Add your logic here.
    return jamie_exec_date

ext_task_sensor = ExternalTaskSensor(
    dag=gordon_tasks,
    task_id='check_dough_freshness',
    external_dag_id='jamie_tasks',
    external_task_id='make_new_dough',
    email=['[email protected]', '[email protected]'],
    execution_date_fn=my_exec_date_fn,  # Pass the function here.
    timeout=1800,
    poke_interval=300,
    mode='reschedule'
)

Based on our earlier case study, our execution_date_fn would need to do the following…

My Airflow is configured to local time (GMT+8), so I need to deduct 8 hours to get the UTC time. Image by author.
My Airflow is configured to local time (GMT+8), so I need to deduct 8 hours to get the UTC time. Image by author.

One naive way could be hardcoding every single run, until the end of time.

# The naive way (This is a bad practice. Don't do this.)
def my_exec_date_fn(gordon_exec_date):
    if gordon_exec_date == datetime(year=2023,month=3,day=14,hour=6,minute=30):
        jamie_exec_date = datetime(year=2023,month=3,day=5,hour=6,minute=0)
    elif gordon_exec_date == datetime(year=2023,month=3,day=15,hour=6,minute=30):
        jamie_exec_date = datetime(year=2023,month=3,day=5,hour=6,minute=0)
    elif gordon_exec_date == datetime(year=2023,month=3,day=16,hour=6,minute=30):
        jamie_exec_date = datetime(year=2023,month=3,day=5,hour=6,minute=0)
    elif gordon_exec_date == datetime(year=2023,month=3,day=17,hour=6,minute=30):
        jamie_exec_date = datetime(year=2023,month=3,day=5,hour=6,minute=0)
    ...

    return jamie_exec_date

This works but it is definitely not the most efficient way.

A better approach is to look for consistent patterns and use that to programmatically derive the outputs. Usually, a good place to look for patterns is the execution_delta, since it contains the relationship between the execution dates (we talked about this here).

Additionally, we can also look at datetime attributes, such as the day of the week. If we really think about it, our External Task Sensor will always be pointing to a Sunday because Jamie only makes dough on Sunday. As we move through the week, Gordon’s task date will be further and further away from this Sunday until it resets again the next Sunday. Then, it repeats.

This is showing the time difference between the current runs for simplicity's sake. Execution_date_fn looks at previous runs, but we will see the same patterns there too. Chef (F) and Chef (M) icons by Freepik.
This is showing the time difference between the current runs for simplicity’s sake. Execution_date_fn looks at previous runs, but we will see the same patterns there too. Chef (F) and Chef (M) icons by Freepik.

This suggests that day of the week can also be helpful in coming up with our execution_date_fn. So let’s add the day of the week to our table. I’ll be labeling Monday as 1 and Sunday as 7 as per the ISO 8601 standard.

The numbers in brackets are the week of day, where Monday is 1 and Sunday is 7. Image by author.
The numbers in brackets are the week of day, where Monday is 1 and Sunday is 7. Image by author.

By labeling them, it becomes immediately clear that…

  • The execution_delta starts from 6 on a Saturday.
  • The execution_delta increases by 1 every day, up to a maximum of 12 every Friday.
  • The execution_delta then resets back to a 6 on a Saturday.

We can re-create that relationship in a Python function and assign this execution_date_fn to our External Task Sensor.

def my_exec_date_fn(gordon_exec_date):
    day_of_week = gordon_exec_date.isoweekday()

    if day_of_week in (6, 7):
        time_diff = timedelta(days=day_of_week, minute=30)
        jamie_exec_date = gordon_exec_date - time_diff
    elif day_of_week in (1, 2, 3, 4, 5):
        time_diff = timedelta(days=day_of_week+7, minute=30)
        jamie_exec_date = gordon_exec_date - time_diff

    return jamie_exec_date

ext_task_sensor = ExternalTaskSensor(
    dag=gordon_tasks,
    task_id='check_dough_freshness',
    external_dag_id='jamie_tasks',
    external_task_id='make_new_dough',
    email=['[email protected]', '[email protected]'],
    execution_date_fn=my_exec_date_fn,
    timeout=1800,
    poke_interval=300,
    mode='reschedule'
)

There we have it – our very own execution_date_fn. With a bit of creativity, execution_date_fn can cater to any scenario.

How do we fit External Task Sensor into our DAGs?

Up until this point, we’ve covered everything you need to know to get started with External Task Sensor. In this section, I thought it’d be nice to collate all of the things we’ve learned to see how the pieces fit together in our data pipelines.

First of all, we’ll be creating Jamie DAG, in a file called jamie_dag.py.

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

# Define task 1
def make_dough():
    # include your secret recipe here!
    return cookies

# Create DAG
jamie_tasks = DAG(
    dag_id='jamie_tasks',
    description='Jamie to do list. (a.k.a making dough only)',
    schedule_interval='5 3 * * *',
    ...
)

# Include task 0 in DAG (as a starting point)
start = DummyOperator(
    dag=jamie_tasks,
    task_id='start'
)

# Include task 1 in DAG
make_dough = PythonOperator(
    dag=jamie_tasks,
    task_id='make_dough',
    python_callable=make_dough,
    ...
)

# Create dependencies (deciding the sequence of task to run)
start >> make_dough

Then, we’ll be creating Gordon DAG, in another file called gordon_dag.py.

from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor

# Define task 1
def bake_cookies():
    # include your secret recipe here!
    return cookies

# Define task 2
def make_money():
    # include your money making technique step-by-step here.
    return money

# Define execution_date_fn for sensor 1
def my_exec_date_fn(gordon_exec_date):
    day_of_week = gordon_exec_date.isoweekday()

    if day_of_week in (6, 7):
        time_diff = timedelta(days=day_of_week, minute=30)
        jamie_exec_date = gordon_exec_date - time_diff
    elif day_of_week in (1, 2, 3, 4, 5):
        time_diff = timedelta(days=day_of_week+7, minute=30)
        jamie_exec_date = gordon_exec_date - time_diff

    return jamie_exec_date

# Create DAG
gordon_tasks = DAG(
    dag_id='gordon_tasks',
    description='List of things that Gordon needs to do.',
    schedule_interval='5 3 * * *',
    ...
)

# Include task 0 in DAG (as a starting point)
start = DummyOperator(
    dag=gordon_tasks,
    task_id='start'
)

# Include task 1 in DAG
bake_cookies = PythonOperator(
    dag=gordon_tasks,
    task_id='bake_cookies',
    python_callable=bake_cookies,
    ...
)

# Include task 2 in DAG
make_money = PythonOperator(
    dag=gordon_tasks,
    task_id='make_money',
    python_callable=make_money,
    ...
)

# Create sensor 1
check_dough_freshness = ExternalTaskSensor(
    dag=gordon_tasks,
    task_id='check_dough_freshness',
    external_dag_id='jamie_tasks',
    external_task_id='make_new_dough',
    email=['[email protected]', '[email protected]'],
    execution_date_fn=my_exec_date_fn,
    timeout=1800,
    poke_interval=300,
    mode='reschedule'
)

# Create dependencies (deciding the sequence of task to run)
(start
    >> check_dough_freshness
    >> bake_cookies
    >> make_money)

Note that External Task Sensor is in gordon_dag.py and not jamie_dag.py since we want Gordon to be checking on Jamie, not the other way around. Gordon’s DAG would be the current DAG and Jamie the external DAG.

And… there we have it!

We’ve created our very first External Task Sensor, check_dough_fresness. This sensor will poke Jamie’s make_new_dough() returns either Success or Fail. If it fails, bake_cookies() and make_money() will not run.

Bonus: Concept of Dates in Airflow

Dates in Apache Airflow are confusing because there are so many date-related terminologies, such as start_date, end_date, schedule_interval, execution_date, etc. It’s a mess, really. But let’s try and figure it out with a story.

Suppose that our boss wants to know the sales performance of his company. He wants this data to be refreshed every day at 12 midnight for the next 6 months.

First, we write a complicated SQL query that generates the sales performance data. It takes 6 hours to run the query.

  • task_start is the start time of a task.
  • task_end is the end time of a task.
  • task_duration is the time it takes to run the task.
A single task. Image by author.
A single task. Image by author.

Every day, we will need to run this task at 12 midnight.

A single task, scheduled at 12am and runs for 6 hours. Image by author.
A single task, scheduled at 12am and runs for 6 hours. Image by author.

To automate this query, we create an Airflow DAG and specify the start_date and end_date. Airflow will execute the DAG as long as today’s date falls within this period.

An Airflow DAG. Image by author.
An Airflow DAG. Image by author.

Then, we put the task into the Airflow DAG.

We need this data refreshed once a day at 12 midnight. So, we set the schedule_interval to "0 0 * * *", which is the CRON equivalent of daily at 12 midnight.

The schedule_interval essentially adds a delay between each consecutive schedule, telling Airflow only run the task at a specific time, since we don’t want the task to re-run again as soon as it finishes.

  • interval_start refers to the start time of a particular schedule interval.
  • interval_end refers to the end time of a particular schedule interval.
Note that interval_start and interval_end can overlap. The interval_end of the previous schedule interval will be the same as the interval_start of the next schedule interval. Image by author.
Note that interval_start and interval_end can overlap. The interval_end of the previous schedule interval will be the same as the interval_start of the next schedule interval. Image by author.

Here comes the most mind-blowing part – although seemingly counterintuitive, Airflow Scheduler triggers a DAG run at the end of its schedule interval, rather than at the beginning of it.

This means that Airflow will not do anything in the first-ever schedule interval. Our query will run for the first time on 2nd Jan 2023 at 12 am.

The colored bars are like data. All the "yellow" data only gets summarized on 2nd Jan. Image by author.
The colored bars are like data. All the "yellow" data only gets summarized on 2nd Jan. Image by author.

This is because Airflow is originally created as an ETL tool. It’s built on the idea that data from a period of time gets summarised at the end of the interval.

For example, if we wanted to know the sales of cookies for the 1st of January, we wouldn’t create a sales report on the 1st of January at 1 pm because the day hasn’t ended yet and the sales number would be incomplete. Instead, we would only process the data when the clock strikes 12 midnight. Today, we will be processing yesterday’s data.

Why is this important?

Since we are summarizing the previous run’s data, the sales report we are producing on the 2nd of Jan describes the 1st of Jan sales, not the 2nd of Jan sales.

For that reason, Airflow finds it more meaningful to refer to this run as the 1st of Jan run even though it’s executed on the 2nd. To better differentiate the dates, Airflow gives a special name to the beginning of a schedule interval—execution_date.

Although we run the "yellow" task on 2nd Jan, its execution date is actually 1st Jan. Image by author.
Although we run the "yellow" task on 2nd Jan, its execution date is actually 1st Jan. Image by author.

This is why we always take the difference of the "previous" run when we calculate execution_delta because it is the delta of the execution_dates, which is essentially the "previous" run.

Concluding Remarks

External Task Sensors are like gatekeepers. They stop bad data from going downstream by making sure that tasks are executed in a specific order and that the necessary dependencies are met before proceeding with subsequent tasks.

For those who have never used External Task Sensors before, I hope the article was able to convey its importance and convince you to start using them. For those who have been using them, I hope some of the insights here are able to help deepen your understanding.

Thank you for your time, and have a great day.


Enjoyed the article? Consider becoming a Medium member to get full access to every story and support content creators like me.

Join Medium with my referral link – Casey Cheng


Related Articles