Machine Learning Streaming with Kafka, Debezium, and BentoML

Creating a real-time price recommender system using modern data-related tools

João Pedro
Towards Data Science

--

Photo by EJ Strat on Unsplash

Introduction

Recently, GitHub announced the expected (and controversial) Copilot, an AI capable of generating and suggesting code snippets with considerably good performance.

However, Copilot is not only impressive for its suggestion capacities — something already achieved in scientific papers — but mainly for the fact that it is an excellent product (and I also say this from the perspective of a user), capable of providing predictions in real time to millions of developers simultaneously through simple text editors’ extensions.

As machine learning technologies mature, it becomes increasingly important to understand not only how AI models work and how to increase their performance, but also the technical part of how to put them into production and integrate them with other systems.

To exercise this part of “AI infrastructure”, in this post we will simulate a real situation (or almost), where it will be necessary to integrate a Machine Learning model with a “production” database to make real-time predictions as new records are added.

Maybe the post gets a little long, so roll up your sleeves and join me on this project.

The problem

Suppose we have a selling car platform, where the users can register and announce their vehicles. As new cars are registered (in the database), the app should suggest (using our machine learning model) a price for the vehicle. Of course, this application needs to run in real-time, so the user can quickly receive appropriate feedback.

Proposed app. Image by Author. Icons by Freepik.

To simulate the data, we’re going to use the Ford Used Car Listing dataset from Kaggle, a dataset containing the selling price of over 15k cars and their respective attributes (Fuel type, mileage, model, etc).

I previously made some experiments on the dataset and found a good enough model, (the full code will be available on GitHub) so let’s skip the data analysis/data science part to focus on our main goal — making the application work.

The proposed architecture

To solve our problem, we are going to need the following things: A way to detect when new entries are added to the database (Change Data Capture), an application to read these entries and predict the price with the machine learning model, and a way to write these entries back in the original database (with the price), all in real-time.

Luckily, we don’t have to reinvent the wheel. The tools presented in the following sections will help us a lot, with little (or no) code at all.

CDC with Debezium & Kafka

Change Data Capture, or just CDC, is the act of monitoring and tracking the changes in a database. You can think of CDC as data gossip, every time something happens inside the database, the CDC tool listens and shares the message with its “friends”.

For example, if the entry (João, 21) is added to the table neighbors, the tool will whisper something like: {‘added’:{‘name’: ‘João’, ‘age’:21, ‘id’:214}}.

And this is very useful for many applications as the changes captured can be used for many tasks, like database synchronization, data processing, and Machine Learning, which is our case.

Debezium is an open-source tool specialized in CDC. It works by reading the database (in this case called source) logs and transforming the detected changes into standardized structured messages, formatted in AVRO or JSON, so another application can consume it without worrying about who is the source.

Source CDC with Debezium. Image by Author. Icons by Freepik.

It also can do it the other way, by receiving standardized messages describing a change and reflecting it into the database (in this case called sink).

Sink CDC with Debezium. Image by Author. Icons by Freepik.

Debezium is built on top of Apache Kafka, a famous open-source Distributed Event Streaming Tool used by many big companies, like Uber and Netflix, to daily move gigabytes of data. Because of this huge scalability when comes to data movement, Kafka has an immense potential to help machine learning models in production.

We don’t need to know a lot about Kafka for this project, just its basic concepts. In Kafka, we have a structure of topics, containing messages (literally just a string of bytes) written by a producer and read by a consumer. The latter two can be any application that’s able to connect with Kafka.

It has proven to be an excellent tool for large-scale applications, which is definitely not our case with this simple project, but its simplicity in use pays out any overhead added (in this project).

And that’s how our data moves: When Debezium is configured to watch some table in our database it transforms the detected changes into standardized messages, serializes them into bytes, and sends them to a Kafka topic.

Then, another application can connect to that Topic and consume the data for its needs.

Data movement. Image by Author. Icons by Freepik.

BentoML

BentoML is an open-source framework for serving ML models. It allows us to make versioning and deploying of our machine learning model with a simple python library.

It's an excellent tool, especially if you are from the data science world and never took off a model from the Jupyter Notebook’s “happy fields” into the “production” world.

The famous python libraries for machine learning either don’t have a way to serve models, because they consider it out of scope or when they have it, it is not so easy to use. Because of this, many projects rely on delivering their models via APIs built with FastAPI or Flask, which is fine, but not optimal.

In my opinion, BentoML narrows this gap between model training and deploying very nicely.

We will learn more about it in the following sections.

Joining everything together

Now that we know, at least superficially, the tools used, you probably already figured out how we going to solve the problem.

Proposed architecture. Image by Author. Icons by Freepik.

We’ll have a Debezium instance watching our database, streaming every change detected to a Kafka topic. On the other side, a python app consumes the messages and redirects them to the BentoML service, which returns a predicted price. Then, the python app joins the records with their predicted prices and writes them back to another Kafka topic. Finally, the Debezium instance, which is also watching this topic, reads the messages and saves them back into the database.

Ok, that’s a lot of steps, but don’t be scared, I promise that the code for doing all this is very simple.

To ease the understanding, let’s make an X-ray on the above image and see some internal organs (components) of our creature (architecture).

Proposed archiecture X-ray. Image by Author. Icons by Freepik.

All we need to do is to create the database, configure the Debezium connectors (source and sink) and deploy our machine learning model with Python.

The implementation

I’ll try to be brief, the full detailed code will be on GitHub.

The environment

The first thing to do is configure the environment, all you need is:

  1. A Python environment with the following packages:
numpy
pandas
scikit-learn==1.1.2
xgboost==1.6.1
bentoml
pydantic

used to train and deploy the machine learning model.

2. Docker and docker-compose.

All the infrastructure is built using containers. Also, we will be using Postgres as our database.

And that’s all 👍

Configuring Postgres

The Postgres configuration is very simple, we only need to create a table to store the car data and set the configuration wal_level=logical.

SQL script to create the table inside Postgres.

So, the Postgres Dockerfile is just this:

The wal_level=logical is a configuration needed to Postgres work correctly with Debezium.

Configuring Debezium and Kafka connectors

Getting started with Kafka and Debezium (with Docker) is straightforward, is just a matter of configuring the images and connectors correctly. The docker-compose and Dockerfile used in this project were based on one of Debezium’s examples in the official repository.

Note: I’ve hidden some lines to make this code block shorter, check the full code on GitHub.

The Debezium Dockerfile is configured with the drivers for Kafka Connect and Postgres.

With this docker-compose file, we are ready to configure Debezium. To start the containers, type in the terminal:

docker-compose up --build

After some initial configurations (and a lot of logs), the containers should start correctly. Now you can open your browser on localhost:8083.

localhost:8083. Image by Author.

This is the base endpoint of Debezium’s API, which is where all the configurations occur. For example, if we move to localhost:8083/connector-plugins/, it's possible to see all the plugins available to create a connector.

localhost:8083/connector-plugins. Image by Author.

To create a new database connector, we need to send a POST request with the connector’s configurations to the endpoint /connectors. As said before, there are two types of connectors, the source connectors that retrieve the changes from the database and stream them to Kafka, and the sink connectors that read messages from Kafka and reflect them to the database.

Let’s create the source connector for our Postgres database and see how this works.

We just need to pass the database address and credentials, the connector class (one of the available at the endpoint /connector-plugins/), and the table that we want to capture.

You can learn more about these connectors and configurations in this post.

Now, Debezium will create a Kafka topic named car_database.public.car_data and start streaming the changes to it.

Example of record streamed to Kafka. Image by Author.

In the image above, on the left, we can see an entry that I added to the database and, on the right, the message created on Kafka. The message is written in AVRO, which can be understood as a JSON divided into “payload” and “schema”.

The sink connector's configuration follows the same logic, but in this case, we also need to give a name for our topic. Debezium will automatically create the sink table (if it does not exist) using the topic’s title, inferring the columns based on the first message sent.

This is why we don’t need to create a second table in Postgres: it will be automatically generated by Debezium.

Deploying our model with BentoML

The next thing to do is to deploy our machine learning model with BentoML. This is achieved with three steps: Saving our model, building a Bento, and transforming it into a Docker container.

Saving a model is not so different from saving any file, you just give it a name and persist it on disk with the save_model function.

BentoML provides many functionalities to monitor and version the saved models that are worth checking.

With the model saved, we can build a service to deploy it.

The trained model is loaded as a runner, a special type of object used by BentoML to represent models. The runner is used to create a service object and, with it, we define the /predict endpoint, responsible for receiving a record and returning its predicted price.

The last thing to do before deploying our model is to define a bentofile, a special configuration file used by BentoML to describe the deployment environment.

Then, by running the commands below, we create a Docker image with our service named ford_price_predictor:1.0.0.

bentoml build --version 1.0.0
bentoml containerize ford_price_predictor:1.0.0

Finally, we can start a container instance of our service.

docker run -p 3000:3000 ford_price_predictor:1.0.0

And interact with it by accessing localhost:3000.

Connecting Stream and Model

Right now, we have the two main parts of our pipeline built, all that rests is to connect them, and this will be achieved using a python script.

On one side, Debezium streams data to the Kafka topic car_database.public.car_data and waits for messages in car_data_predicted. On the other, the BentoML service is deployed and waiting for predictions on the endpoint /predictions.

To connect to Kafka, we gonna use the confluent_kafka package, and to connect to the deployed model, the requests package.

Next, we define the Kafka topics and the bento service URL.
The URLs are not localhost because this script will run inside another container.

Then, a Kafka consumer is created for the source topic and a producer for the sink topic.

The consumer retrieves one message at a time from the source topic. Then, just the required fields to make a prediction are extracted from each message.

The message is sent to the machine learning model via a POST request, that returns the predicted price.

Finally, the predicted price is added to the original message so the producer can send it to the sink topic.

Debezium will read this new message and create the respective record in the database.

And we can call it a day!

Seeing it working

It is finally time to see our project working :)

Unfortunately, I don’t have a fancy app to test our pipeline, so we’re going to do it by interacting directly with the database.

Adding an example record. Image by Author.

As we can see in the gif, when a new record is added in the car_data, another is automatically created in the car_data_predicetd table with the suggested price.

If we keep adding more and more records in the car_data table.

Records added. Image by Author.

They will be duplicated in the car_data_predicted table with the suggested price.

Records with their predicted prices. Image by Author.

In summary, it’s working!

Conclusion

Putting a Machine Learning project to life is not a simple task and, just like any other software product, it requires many different kinds of knowledge: infrastructure, business, data science, etc.

I must confess that, for a long time, I just neglected the infrastructure part, making my projects rest in peace inside Jupiter notebooks. But as soon as I started learning it, I realized that is a very interesting topic.

Machine learning is still a growing field and, in comparison with other IT-related areas like Web development, the community still has a lot to learn. Luckily, in the last years we have seen a lot of new technologies arise to help us build an ML application, like Mlflow, Apache Spark’s Mlib, and BentoML, explored in this post.

In this post, a machine learning architecture is explored with some of these technologies to build a real-time price recommender system. To bring this concept to life, we needed not only ML-related tools (BentoML & Scikit-learn) but also other software pieces (Postgres, Debezium, Kafka).

Of course, this is a simple project that doesn’t even have a user interface, but the concepts explored in this post could be easily extended to many cases and real scenarios.

I hope this post helped you somehow, I am not an expert in any of the subjects discussed, and I strongly recommend further reading (see some references below).

Thank you for reading! ;)

References

All the code is available in this GitHub repository.

[1] Debezium official docs
[2] Jiri Pechanec (2017), Streaming data to a downstream database — Debezium Blog
[3] Bentoml API I/O descriptors — BentoML Docs
[4] BentoML concepts — BentoML Docs
[5] Kai Waehner (2020), Streaming Machine Learning with Kafka-native Model Deployment — Kai Waehner
[6] Tim Liu, Why Do People Say It’s So Hard To Deploy A ML Model To Production? — BentoML blog
[7] Debezium examples — Debezium official repository on Github
[8] Ben Dickson (2022), GitHub Copilot is among the first real products based on large language models — Tech Talks
[9] Ford Used Car Listing, CC0: Public Domain — Kaggle

--

--

Bachelor of IT at UFRN. Graduate of BI at UFRN — IMD. Strongly interested in Machine Learning, Data Science and Data Engineering.