BigData/ETL: 4 Easy steps to setting up an ETL Data pipeline from scratch

Setting up an ETL pipeline within a few commands

Burhanuddin Bhopalwala
Towards Data Science

--

ETL (Extract Transform Load)

What not to expect from this Blog? Managed ETL solutions like AWS Glue, AWS Data Migration Service or Apache Airflow. Cloud-based techniques are managed but not free. And are not covered in this article.

Table of contents

  1. What is an ETL pipeline?
  2. What are the various use cases of an ETL pipeline?
  3. ETL prerequisites — Docker + Debezium + Kafka + Kafka Connect — Bird’s-eye view
  4. ETL setup — A 4 step process

1: What is an ETL?

ETL stands for Extract Transform Load pipeline. And it’s used for setting up a Data warehouse or Data lake.

Note: Data warehouse is collecting multiple structured Data sources like Relational databases, but in a Data lake we store both structured & unstructured data.

2: What are the various use cases of an ETL pipeline?

ETL has broad use cases which are as follows:

  • Giving structure to unstructured data, because we are storing it into a Data warehouse which generally we use for storing structured data from multiple resources.
  • Data pipeline for ML engineers to get the Data for training the models. And is quite often to be the first task for an ML engineer / Data Scientist (L1).
  • For making a backup or staging data source.

Note: In this example, we will use the MySQL as source database and Elasticsearch as destination which inherently integrated with Kibana for Data Visualization and Machine Learning.

3: ETL prerequisites —Docker + Debezium + Kafka + Kafka Connect — Bird’s-eye view

MySQL (source) + Debezium + Kafka / Kafka Connect + Elasticsearch (destination) + Kibana

As you can see from the above picture we are going to use the following:

  • Docker: A Container Management System (CMS). We are using Docker for simplicity. https://www.docker.com/
  • Debezium: Debezium is nothing but a Change Data Capture (CDC). Which tracks every event (insert, update, delete) from the source DB and will push the event to Kafka using Kafka Connect. It uses source Database logs for reading every transaction and making an event for a particular transaction.

Note: In case of MySQL we called it as binlog and in case of PostgreSQL we called it as wal-logs (Write Ahead Log)

  • Kafka Connect: As the name suggests, it helps Debezium to connect with Kafka.
  • Kafka: Kafka helps in event streaming and consuming in real-time. Kafka works with Zookeeper for tracking the events. https://bit.ly/2Gb9Sm7
  • ELK (destination): We are considering Elasticsearch as our destination Data source, which by default integrated with Kibana for Data Visualization & Machine learning too, this is popularly known as Elasticsearch + Logstash + Kibana (ELK stack) https://bit.ly/36dmioe

4: Let’s start setting this up — A 4 Step process

Step 1: Changing the MySQL binlog format which Debezium likes: Just go to /etc/my.cnf, basically in MySQL configuration file and add the following configurations:

/etc/my.cnf (MySQL configuration file)

Step 2: Starting Zookeeper, Kafka & Kafka Connect using Docker:

$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper $ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.0$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.0

Note: If you are familiar with Docker, You can use docker-compose.yaml. You can find it here: https://github.com/debezium/debezium-examples/blob/master/tutorial/

Step 3 (Extract): We’ll use curl to submit to our Kafka Connect service a JSON request message to start capturing events from source DB using Debezium under the hood (It needs the source DB credentials below):

curl -i -X POST 
-H "Accept:application/json"
-H "Content-Type:application/json"
localhost:8083/connectors/ -d
'{ "name": "etl-connector",
"config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "<mysql_host>",
"database.port": "3306",
"database.user": "<mysql_username>",
"database.password": "<mysql_password>",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "<database_name>", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.<db_name>" } }'

Step 4 (Transform & Load): The final step is to write a Kafka Consumer. The Consumer is nothing but a simple function/code that will extract the Debezium events Transform it and Load them into the ELK destination.

Please find the entire reference source code template here: https://github.com/burhanuddinbhopalwala/etl-elasticsearch-app

It’s Done! In this example, we are using bulk Insertion for Elasticsearch. And you can see the logs below from the above source code.

...
2017-09-21 07:38:48,385 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,402 INFO MySQL|dbserver1|task Successfully joined group inventory-connector-dbhistory with generation 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,403 INFO MySQL|dbserver1|task Setting newly assigned partitions [dbhistory.inventory-0] for group inventory-connect WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTask
INFO -- : CREATING MASTER DB CONNECTION
INFO -- : CONNECT ELASTICSEARCH
INFO -- : CONNECTED KAFKA
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 1, ID: 685475
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 2, ID: 457548
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 3, ID: 985484
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 4, ID: 258547
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 5, ID: 257544

Errors: If you get any errors you can always go to Debezium website: https://debezium.io/.

Lastly, Thanks for reading. I hope you find this blog helpful. And as always remember to breathe :)

Connect 🤝:

For further reading ✍️:

Big Data & Cloud Engineering blogs:

Backend Engineering & Software Engineering blogs:

--

--

Sr. SWE | Cloud & Data Engg @Careem/Uber | xAmazon | CodeChef Certified | AWS 3X Certified | AWS Data, ML, Solutions and Security Architect | GSoC 2017 & Mentor