BigData/ETL: 4 Easy steps to setting up an ETL Data pipeline from scratch
Setting up an ETL pipeline within a few commands
What not to expect from this Blog? Managed ETL solutions like AWS Glue, AWS Data Migration Service or Apache Airflow. Cloud-based techniques are managed but not free. And are not covered in this article.
Table of contents
- What is an ETL pipeline?
- What are the various use cases of an ETL pipeline?
- ETL prerequisites — Docker + Debezium + Kafka + Kafka Connect — Bird’s-eye view
- ETL setup — A 4 step process
1: What is an ETL?
ETL stands for Extract Transform Load pipeline. And it’s used for setting up a Data warehouse or Data lake.
Note: Data warehouse is collecting multiple structured Data sources like Relational databases, but in a Data lake we store both structured & unstructured data.
2: What are the various use cases of an ETL pipeline?
ETL has broad use cases which are as follows:
- Giving structure to unstructured data, because we are storing it into a Data warehouse which generally we use for storing structured data from multiple resources.
- Data pipeline for ML engineers to get the Data for training the models. And is quite often to be the first task for an ML engineer / Data Scientist (L1).
- For making a backup or staging data source.
Note: In this example, we will use the MySQL as source database and Elasticsearch as destination which inherently integrated with Kibana for Data Visualization and Machine Learning.
3: ETL prerequisites —Docker + Debezium + Kafka + Kafka Connect — Bird’s-eye view
As you can see from the above picture we are going to use the following:
- Docker: A Container Management System (CMS). We are using Docker for simplicity. https://www.docker.com/
- Debezium: Debezium is nothing but a Change Data Capture (CDC). Which tracks every event (insert, update, delete) from the source DB and will push the event to Kafka using Kafka Connect. It uses source Database logs for reading every transaction and making an event for a particular transaction.
Note: In case of MySQL we called it as binlog and in case of PostgreSQL we called it as wal-logs (Write Ahead Log)
- Kafka Connect: As the name suggests, it helps Debezium to connect with Kafka.
- Kafka: Kafka helps in event streaming and consuming in real-time. Kafka works with Zookeeper for tracking the events. https://bit.ly/2Gb9Sm7
- ELK (destination): We are considering Elasticsearch as our destination Data source, which by default integrated with Kibana for Data Visualization & Machine learning too, this is popularly known as Elasticsearch + Logstash + Kibana (ELK stack) https://bit.ly/36dmioe
4: Let’s start setting this up — A 4 Step process
Step 1: Changing the MySQL binlog format which Debezium likes: Just go to /etc/my.cnf, basically in MySQL configuration file and add the following configurations:
Step 2: Starting Zookeeper, Kafka & Kafka Connect using Docker:
$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper $ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.0$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.0
Note: If you are familiar with Docker, You can use docker-compose.yaml. You can find it here: https://github.com/debezium/debezium-examples/blob/master/tutorial/
Step 3 (Extract): We’ll use curl
to submit to our Kafka Connect service a JSON request message to start capturing events from source DB using Debezium under the hood (It needs the source DB credentials below):
curl -i -X POST
-H "Accept:application/json"
-H "Content-Type:application/json"
localhost:8083/connectors/ -d
'{ "name": "etl-connector",
"config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "<mysql_host>",
"database.port": "3306",
"database.user": "<mysql_username>",
"database.password": "<mysql_password>",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "<database_name>", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.<db_name>" } }'
Step 4 (Transform & Load): The final step is to write a Kafka Consumer. The Consumer is nothing but a simple function/code that will extract the Debezium events Transform it and Load them into the ELK destination.
Please find the entire reference source code template here: https://github.com/burhanuddinbhopalwala/etl-elasticsearch-app
It’s Done! In this example, we are using bulk Insertion for Elasticsearch. And you can see the logs below from the above source code.
...
2017-09-21 07:38:48,385 INFO MySQL|dbserver1|task Kafka version : 0.11.0.0 [org.apache.kafka.common.utils.AppInfoParser]
2org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,402 INFO MySQL|dbserver1|task Successfully joined group inventory-connector-dbhistory with generation 1 [org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
2017-09-21 07:38:48,403 INFO MySQL|dbserver1|task Setting newly assigned partitions [dbhistory.inventory-0] for group inventory-connect WorkerSourceTask{id=inventory-connector-0} finished initialization and start [org.apache.kafka.connect.runtime.WorkerSourceTaskINFO -- : CREATING MASTER DB CONNECTION
INFO -- : CONNECT ELASTICSEARCH
INFO -- : CONNECTED KAFKA
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 1, ID: 685475
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 2, ID: 457548
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 3, ID: 985484
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 4, ID: 258547
INFO -- : WAITING FOR 500 MESSAGES, RECEIVED 5, ID: 257544
Errors: If you get any errors you can always go to Debezium website: https://debezium.io/.
Lastly, Thanks for reading. I hope you find this blog helpful. And as always remember to breathe :)
Connect 🤝:
- Email: bbhopalw@gmail
- Linkedin: www.linkedin.com/in/bbhoaplw
For further reading ✍️:
Big Data & Cloud Engineering blogs:
- Towards Data Science Publication: https://medium.com/@burhanuddinbhopalwala
Backend Engineering & Software Engineering blogs:
- DEV Community:
https://dev.to/burhanuddinbhopalwala