The world’s leading publication for data science, AI, and ML professionals.

The Nice Way To Manage Your Data Science Workflow

Learn to build an ETL pipeline with Prefect in a beginner example

As Data Scientists, we take a significant amount of pride in our work. We try to write good, modular code with logging and tests to make sure our code works as expected. But unfortunately, the perfect pipeline of Extraction, Transformation, and Loading for data does not exist, however much effort we put into designing it.

In fact, writing good code has sometimes nothing to do with its failure to run smoothly. There are factors that are invisible even to our trained eyes, and take quite a lot of time and effort to catch them.

So, what if I told you that there was a way to make sure that you can see where it all went wrong, in case it goes wrong? That there was a perfect tool that stays invisible when your code runs nicely and becomes visible only when your code fails to run as desired?

What if I said that there was a tool that helped you make sure that your code fails successfully?

Welcome to the Prefect world.

Prefect helps you handle workflow management proficiently and transparently. It is a tool that combines a variety of features to make sure your code has all it needs to execute or fail successfully.

In this article, I discuss some of the basic concepts for learning how to use Prefect and building a simple ETL pipeline.

Let’s get started 👇

Installing Prefect and Some Simple Definitions

Prefect can be installed simply with pip:

pip install prefect # please do it inside a virtual environment

We need the requests and pandas library for this tutorial as well. Install it simply:

pip install requests pandas # also inside a virtual environment

From its Prefect documentation, it is revealed that:

Tasks are essentially:

Prefect tasks are functions that have special rules about when they should run: they optionally take inputs, perform some work, and optionally return an output.

So they are used to do some work, or some part of the work.

In Prefect, workflows are defined as "Flows" and

Workflows (or "flows") are containers for tasks. Flows represent the dependency structure between tasks, but do not perform any logic.

So with flows, you can bring together some tasks perform a particular set of desired function. It’s more like a pipeline of tasks.

Okay, now that we’ve done that, let’s see how to build one.

Building Our Tasks

Our goal in this tutorial is simple – fetch a list of users with the help of the Random user Free API and save them into a new csv file upon each run of our workflow.

Sounds simple enough?

Great. Let’s see what components we need to build in this.

Let’s import our libraries first, not too many, I assure you:

import requests
import pandas as pd
from prefect import task, Flow
import json

Now, First of all, we decide to divide our workflow into three tasks (functions):

Extract

We will get a specific number of users from the API in this function.

We use the requests get function to get a list of random users.

@task
def extract(url_from):
    response = requests.get(url_from)
    if response:
        return json.loads(response.content)["results"]
    else:
        print("No response gotten.") 

Notice the @task decorator in there? That’s all that’s needed for making the function into a task.

Next up is the function we will use to transform our user json into a dataframe.

Transform

This is where we convert our JSON field of users into a list people_list of invidividual dictionaries containing the data of each person.

We are going to extract only three features from all person response:

  1. name = first name + last name
  2. gender, and
  3. nationality
@task
def transform(data_dict):
    people_list = []
    for person in data_dict:
        single_item = {
        'gender': person["gender"],
         "name": person["name"]["title"] + person["name"]["first"]  + person["name"]["last"],
"nat": "AU",
         }
        people_list.append(single_item)
    # return dataframe from list of dicts
    return pd.DataFrame(people_list)

Finally, we have our Load function.

Load

This function does the simple task of saving a csv to our local directory.

We specify the "data_df" which is the dataframe we build in the transform step, and the filename to give to the csv we save, as parameters for this function.

@task
def load(data_df, filename):
    data_df.to_csv(f"{filename}.csv", index=False)

Now, let’s go ahead and build an entire workflow out of it.

Making a Flow

At this point, we have defined our individual functions that perform one bit of task on their own. Now, we want to bring them together and establish a pipeline. Specifically, an ETL pipeline.

We do that with Flow.

def start_data_collection(num_people_to_fetch):
    with Flow("Random User API ETL:") as flow:
    # get specific number of people profiles upon each request
    people = extract(f'https://randomuser.me/api/?inc=gender,name,nat&results={num_people_to_fetch}')
    # make a dataframe out of the response
    user_df = transform(people)
    # save the dataframe formed to disk
    load(user_df, f'{num_people_to_fetch}_people')
    return flow

Here is what we do in this function:

  • We make a new Flow object with a name.
  • We start the pipeline with our extract method and fetch some users
  • Then we transform those users into a pandas DataFrame
  • Finally, we save the DataFrame as a csv file with the filename we specify as an argument

We do all these tasks one by one and then return our Flow object.

There’s just one simple step that’s different from simply calling our functions in sequence – we are doing the same thing but inside our Flow object.

Testing our ETL workflow

The last step is to test what we built. Here’s a simple main function to run our flow object:

if __name__ == "__main__":
    flow = start_data_collection(3)
    flow.run()

And that’s it. You’re done.

Here’s the output you’ll get when you run it in the command line with:

python main.py

And here’s the csv that was saved to disk:

Concluding…

So here it is – a simple ETL workflow to help you get started with Prefect. I would recommend you try to replicate this example on your own with some additional concepts include in it such as – failure detection, retrying support and so on.

If you want to refer to the whole code, here it my GitHub repo with all the resources that I use in my articles. Star ⭐️ it and bookmark it.

GitHub – yashprakash13/data-another-day: A master repository of all Data Science projects…


One Final Thing

I hope this article was a simple and helpful read. I write about Data Science and Programming regularly and do my best to also make it fun and engaging for you as well.

So, stay connected and come along the journey with me. 🙂

👉 Here’s the best way to get all my articles, top resources and insights that I share frequently with my 100+ subscribers.

👉 Follow me on Medium to always be in the loop.


Related Articles