Real-time anomaly detection with Apache Kafka and Python

Learn how to make predictions over streaming data coming from Kafka using Python.

Rodrigo Arenas
Towards Data Science

--

Photo by Aron Visuals on Unsplash.

In this post, I'm going to discuss how to make real-time predictions with incoming stream data from Apache Kafka; the solution we are going to implement looks like this:

Solution Diagram. Image by the Author. Icons from flaticon.

The idea is to:

  • Train an anomaly detection algorithm using unsupervised machine learning.
  • Create a new data producer that sends the transactions to a Kafka topic.
  • Read the data from the Kafka topic to make the prediction using the trained ml model.
  • If the model detects that the transaction is not an inlier, send it to another Kafka topic.
  • Create the last consumer that reads the anomalies and sends an alert to a Slack channel.

The article assumes that you know the basics of Apache Kafka, machine learning, and Python.

The transactions could represent any relevant information to analyze in real-time and predict if there could be something out of the ordinary, such as credit card transactions, GPS logs, system consumption metrics, etc.

1. Project Structure:

Our project structure will look like this, you can get the complete code here or by using:

git clone https://github.com/rodrigo-arenas/kafkaml-anomaly-detection.git

First, check the settings.py; it has some variables we need to set, like the Kafka broker host and port; you can leave the ones by default (listening on localhost and default ports of Kafka and zookeeper).

The streaming/utils.py file contains the configurations to create Kafka consumers and producers; it has some default options that you can also change if needed.

Now install the requirements:

pip install -r requirements.txt

2. Train the model

To illustrate how to set up this solution, we are going to generate random data; it will have two variables, they look like this:

Anomaly detection data. Image by the Author.

Next, we are going to use an Isolation Forest model to detect the outliers; in simple words, this model will try to isolate the data points by tracing random lines over one of the (sampled) variables' axes and, after several iterations, measure how "hard" was to isolate each observation, so in the train.py file we have:

After running this, the isolation_forest.joblib file should be created; this is the trained model.

3. Create the Topics

We will use two topics; the first one is called "transactions," where the producer will send new transaction records. Let's create it from the terminal with:

kafka-topics.sh --zookeeper localhost:2181 --topic transactions --create --partitions 3 --replication-factor 1

The second topic is going to be called "anomalies," here is where the module that detects anomalies will send the data, and the last consumer will read it to send a slack notification:

kafka-topics.sh --zookeeper localhost:2181 --topic anomalies --create --partitions 3 --replication-factor 1

4. Transactions Producer:

Now we are going to generate the first producer that will send new data to the Kafka topic "transactions"; we'll use the confluent-Kafka package; in the file streaming/producer.py, we have:

With this code, a producer will send data to a Kafka topic, with a probability of OUTLIERS_GENERATION_PROBABILITY; the data will come from an "outlier generator," will send an auto-incremental id, the data needed for the machine learning model and the current time in UTC.

Let's check everything is correct so far, run the producer.py file, and log to the topic as a consumer from the terminal:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic transactions

You should see the incoming messages like this:

Transactions Producer. Gif by the author.

5. Outlier Detector Consumer:

The data is coming! Now we must read it from a consumer, pass it to the machine learning model to make the predictions, and filter the outliers. This is done in the streaming/anomalies_detector.py file that looks like this:

The consumer read messages from the "transactions" topic and a consumer sent outliers to the "anomalies" topic; besides the data we already had, it will enrich the record with the score given by the model, a measure of "how much" the data is considered an outlier.

Notice that the only messages that go to the new topic are those whose prediction output is -1; this is how this model categorizes that the data is an inlier.

Also, notice that the topic has three partitions, so at the end, I'm using multiprocessing to simulate three independent consumers and speed up the process; they all belong to the same group_id. In production probably those consumers will run on different servers.

Let's check this step, make sure the producer is running and run the anomalies_detector.py file, now in a terminal, let's open the anomalies topic, we should see the incoming transactions that the model predicted as outlier, it should look like this:

Anomalies detection. Gif by the AUthor.

Here there is a visualization of how the transactions producer and outlier detection run simultaneously; the top window is the transaction producer topic, and the bottom one is the outliers sent to the anomalies topic.

Real-time anomalies detection. Gif by the Author.

5. Slack notification:

As the last step, we want to take some actions with these detected outliers; in a real-life scenario, it could block a transaction, scale a server, generate a recommendation, send an alert to an administrative user, etc.

Here we are going to send an alert to a Slack channel; for this, make sure to create a slack app, add the app to the slack channel and register an environment variable called SLACK_API_TOKEN to authenticate Slack. Here is the related documentation.

Now we use the file streaming/bot_alerts.py; here is the code:

So here, the script creates a new consumer, but it's subscribed to the "anomalies" topic; once a message arrives, it will use the Slack API to send the message; for this demo, I sent the same raw message (Try to make it prettier!). The incoming messages look like this:

Slack Notification. Gif by the Author.

So that is it; the solution is up and running! I hope this was useful for you. Remember that the full code is here:

I want to leave some final considerations about this particular implementation:

  • I made all the settings and running were made in a local machine, the choice of how many partitions, consumers, brokers, zookeeper servers, and replicas to set (and other configurations) is something that you must analyze with the base of your business characteristics, the rate in how the data is generated, available resources, etc. I used small enough numbers for the demo.
  • I used scikit-learn and "pure" Python to process the data streaming, but depending on the messages volume/production rate, it could be necessary to use streaming processing capabilities, like spark streaming.
  • There are also limits on the Slack API you must be aware of.

--

--

Data Scientist and open-source contributor working on machine learning, and optimization; for all my projects, check: https://rodrigo-arenas.github.io/portfolio