Introduction to Data Pipelines with Singer.io

Pavneet Singh
Towards Data Science
5 min readApr 18, 2021

--

Data pipelines play a crucial role in all kinds of data platforms, be it for Predictive Analytics or Business Intelligence or maybe just for ETL (Extract — Transport — Load) between various heterogeneous data stores. They all rely on real time or batch ingestion of data which is further processed to derive insights and make predictions or to just summarize and reshape the data for reports. Let’s explore the anatomy of a data pipeline to get a better understanding of the concept and then we’ll get onto creating a data pipeline using an open source ETL tool known as Singer in Python.

Photo by Sigmund on Unsplash

What is a data pipeline?

Just as a water pipeline moves water from a source to a destination, in the same way we use data pipelines to move data from one source system to a destination system. Few scenarios in which data pipelines are used:

1) Synchronizing data between heterogeneous data stores. For example, synchronizing data from an Oracle Database instance to a PostgreSQL database instance.

2) Moving data from staging area to production systems after data cleaning, reshaping, summarizing etc. For example, data scraped from various websites may be collected in a staging area which is then made fit for consumption in a structured manner and then transferred to a database.

3) Distribute segmented data across various sub-systems from a central data source. For example: Survey data for various products collected in Google Sheets by an organization may then be divided and broadcasted to the respective product teams for processing.

Key characteristics of a data pipeline

1) Data frequency: The speed at which the destination systems require the data i.e. at regular intervals in small batches or real time. The pipeline should be capable enough to maintain the frequency of data transfer required by the destination system.

2) Resiliency: How fault tolerant and resilient is the data pipeline i.e. in case the pipeline crashes due to a sudden data load or an overlooked code bug , there should be no loss of data.

3) Scalability: The tools and technology utilized in developing the data pipeline must possess the capability of re-configuring it to scale out onto more hardware nodes if the data load increases.

What is Singer?

The core aim of the Singer is to be “The open-source standard for writing scripts that move data”. It involves the use of standardized scripts for data extraction and ingestion which can be mixed and matched with various sources/targets as per the requirement.

Core Features of Singer

1) Tap — The data source from which data will be extracted is called as a Tap and there are ready-made Taps available on the Singer site which can be used as it is, or custom taps can also be created.

2) Targets — The data target which pulls in the data from the Tap is known as Target. Same as Taps, we can use ready-made Targets from the Singer website or create our own.

3) Data exchange format — In singer JSON (JavaScript Object Notation) format is used as the data exchange format making it source/target agnostic.

4) The Taps and targets are easily clubbed with the Unix based pipe operator without the need for any daemons or complicated plugins.

5) It supports incremental extraction by maintaining the state between invocations i.e. a timestamp can be stored in a JSON file between invocations to record the last instance at which the target had consumed data.

Components of a data pipeline in Singer

We can create custom Taps/Sinks for Singer as per our requirement or just install and use the ones which are already available on the Singer website.

Building a data pipeline with Singer

Let’s create a data pipeline to fetch employee records from a REST API and insert them into a PostgreSQL database table using the tap and sink methodology of Singer.

Prerequisites: A Python environment and a PostgreSQL Database instance.

Since Singer.io requires setting up a tap and sink, we’ll be creating separate virtualenvs for both to avoid version conflicts as each of them might have dependencies on different libraries. This is considered as a best practice when working with Singer.

Setting up the Tap: Rest API

In this demo we will create our own Tap to fetch the employee records from the REST API.

1) Create and activate the virtualenv

python3 -m venv ~/.virtualenvs/Singer.io_rest_tapsource ~/.virtualenvs/Singer.io_rest_tap/bin/activate

2) Install the Singer python library

pip install singer-python

3) Open up a new file called tap_emp_api.py in your favorite editor and add the following code

import singerimport urllib.requestimport json#Here is a dummy JSON Schema for the sample data passed by our REST API.schema = {‘properties’: {‘id’: {‘type’: ‘string’},‘employee_name’: {‘type’: ‘string’},‘employee_salary’: {‘type’: ‘string’},‘employee_age’: {‘type’: ‘string’},‘profile_image’: {‘type’: ‘string’}}}#Here we make the HTTP request and parse the responsewith urllib.request.urlopen(‘http://dummy.restapiexample.com/api/v1/employees') as response:emp_data = json.loads(response.read().decode(‘utf-8’))#next we call singer.write_schema which writes the schema of the employees streamsinger.write_schema(‘employees’, schema, ‘id’)#then we call singer.write_records to write the records to that streamsinger.write_records(‘employees’, records=emp_data[“data”])

Setting up the Target: PostgreSQL

A Target for PostgreSQL is already available on the Singer website so we will set up a virtualenv for installing it.

1) Create and activate the virtualenv

python3 -m venv ~/.virtualenvs/Singer.io_postgres_targetsource ~/.virtualenvs/Singer.io_ postgres_target /bin/activate

2) Install the Singer python library

pip install singer-target-postgres

3) Next we need to create a config file at

~/.virtualenvs/Singer.io_postgres_target/bin/target_postgres_config.json

with postgres connection information and target postgres schema like this:

{“postgres_host”: “localhost”,“postgres_port”: 5432,“postgres_database”: “singer_demo”,“postgres_username”: “postgres”,“postgres_password”: “postgres”,“postgres_schema”: “public”}

Using the Singer Data pipeline

Once we’ve set up the Tap and the Target, the pipeline can be used by simply clubbing the Tap and Target separated by a pipe operator in the shell i.e.

python ~/.virtualenvs/Singer.io_rest_tap/bin/tap_emp_api.py | ~/.virtualenvs/Singer.io_postgres_target/bin/target-postgres — config database_config.json

There you go! All the records fetched from the API will get inserted into a table as per the database_config file and schema we made for the Tap. Easy peasy wasn’t it?!

What’s Next?

Explore the Singer.io documentation and try out various Tap/Target combinations as per your use case or create your own Taps and Targets.

You can also explore data pipeline orchestration tools like Airflow and Luigi which have got a wide range of features as well.

I’ll be coming up with some interesting guides related to Airflow in my upcoming posts. Until then.. stay happy plumbing your Data Pipelines!

--

--

Data Analytics Enthusiast | Python | SQL | Tableau | Alteryx | PowerBI | Azure Data Factory | Power Platform | linkedin.com/in/pavneets