The world’s leading publication for data science, AI, and ML professionals.

Using Kafka with Python

Learn how to create producers and consumers with Python and plot a dynamic scatter plot

Photo by Markus Winkler on Unsplash
Photo by Markus Winkler on Unsplash

In my previous article on Kafka, I introduced the use of Kafka for data streaming. I also showed how you can start a Kafka broker service and demonstrated how to use the Kafka producer console application to send messages and the Kafka consumer console application to receive messages.

In this article, I am going to show you how to make use of Kafka using Python. Specifically, I will:

  • Use Python to send messages to a Kafka broker service
  • Use Python to receive messages from a Kafka broker service
  • Build a dynamic charting application to plot and update a scatter plot wherever new data is received from the broker service

Using Python with Kafka

There are at least three Python libraries available for Python developers to interface with Kafka broker services. They are:

For this article, I will make use of the Confluent Kafka Python package.

To install the Confluent Kafka Python package, use the pip command:

!pip install confluent-kafka

Producing the Message

First, let’s work on the producer first. The producer is the one that sends messages to the Kafka broker service. The following code snippet specifies the Kafka broker server to connect to:

from confluent_kafka import Producer
import socket

conf = {
    'bootstrap.servers': "localhost:9092",
    'client.id': socket.gethostname()
}

producer = Producer(conf)

You need to ensure that your Kafka broker service is up and running. See https://towardsdatascience.com/using-apache-kafka-for-data-streaming-9199699623fa for more details on how to start one.

Using Apache Kafka for Data Streaming

To send a message, you can use the produce() function from the Producer object. You can pass four arguments to it:

  • the topic to send
  • the key for your message payload
  • the message payload
  • the callback function to invoke when you poll the producer to know if the message was successfully delivered (or not).

The following code snippet sends a message to the Kafka broker service:

def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

producer.produce("SomeTopic", key="key1", value="Hello", callback=acked)     
producer.poll(1)   # Maximum time (1s) to block while waiting for events

The poll() function returns the number of events processed (callbacks served).

Let’s send another message with a different key:

producer.produce("SomeTopic", key="key2", value="World", callback=acked)     
producer.poll(1)

Consuming the Message

With the message sent, you can now work on the consumer. The following code snippet connects to the Kafka broker service:

from confluent_kafka import Consumer

conf = {
    'bootstrap.servers': "localhost:9092",
    'group.id': "1",
    'auto.offset.reset': 'latest'
}

consumer = Consumer(conf)

The group.id indicates which consumers you belong to. If there are two consumers with the same Group ID assigned to the same topic, they will all share the work of reading from the same topic.

The auto.offset.reset indicates…specifies how a consumer should behave when consuming from a topic partition when there is no initial offset.

I will discuss the offset in another article.

To consume messages, let’s define a function named consume(). It takes in the consumer as well as the topics to subscribe to:

from confluent_kafka import KafkaError, KafkaException

def consume(consumer, topics):    
    try:
        consumer.subscribe(topics)
        # use this as a way to stop the loop
        t = threading.currentThread()
        while getattr(t, "run", True):
            msg = consumer.poll(timeout=5.0)
            if msg is None: 
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %dn' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                key = msg.key().decode("utf-8")
                data = msg.value().decode("utf-8")
                print(key, data)
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

In this function, you first subscribe to the topic that you want to listen to. Then, you use the threading.currentThread() function to check if an attribute named run has been set on the current thread that is used to run this function. This is used to control whether you should continue to wait for the next message or exit the function.

We will use an infinite loop to keep on polling the Kafka broker service. The timeout parameter allows you to set the time to block the call until a message is returned by the broker service. If you want to poll the broker at a higher frequency, set the timeout to a lower value, such as 0.5 second.

If a message is returned, you can extract its key and value and then print them out. Finally, if the infinite loop is terminated, you close the consumer.

To run the consume() function, we will use the threading package:

import threading
thread = threading.Thread(target=consume, 
                          args=(consumer, ["SomeTopic"]))
thread.start()

The start() function runs the consume() function as an independent thread so that it does not freeze your Jupyter Notebook.

You should now be able to see two incoming messages that were sent by the producer:

key1 Hello
key2 World

To terminate the consumer, simply set the run attribute of thread to False and the consume() function will stop running:

thread.run = False

Plotting Chart

Now that you are able to produce and consume messages using the Confluent Kafka Python package, it is time to do something useful with this newfound knowledge!

Let’s use the producer to simulate an IOT device sending sensor data to the Kafka broker service, and on the consumer end we will read the data and use it to plot a chart. As new data is received, the chart will be dynamically updated. All these will work directly on Jupyter Notebook.

Sending Sensor Data

Let’s define a function named send_message(), which takes in four arguments:

  • topic – the topic for the message
  • datetime – the datetime of the sensor data collected
  • temp1 – the temperature reading for sensor 1
  • temp2 – the temperature reading for sensor 2

These four arguments will then be used to send the message:

def send_message(topic, datetime, temp1, temp2):
    producer.produce(topic, key="key", value=f"{datetime},{temp1},{temp2}", callback=acked)
    producer.poll(1)

We will also define a function named update() that will be called every 2 seconds to call the send_message() function with some random values:

import threading
import datetime
import random

def update():
    threading.Timer(2.0, update).start()     # call update() every 5s
    send_message('SensorData',
                 datetime.datetime.utcnow(), # datetime in UTC
                 random.uniform(20, 39),     # temperature1
                 random.uniform(10, 29))     # temperature2

update()

Consuming the message and plotting the chart

For the consumer, let’s create the Consumer object:

from confluent_kafka import Consumer

conf = {
    'bootstrap.servers': "localhost:9092",
    'group.id': "1",
    'auto.offset.reset': 'latest'
}

consumer = Consumer(conf)

Then we will make use of Plotly to add two scatter plots to a Figurewidget:

A FigureWidget is a graph library that can display charts in Jupyter Notebook.

from confluent_kafka import KafkaError, KafkaException

import time
import Plotly.graph_objects as go
import numpy as np

# initialize a plot
fig = go.FigureWidget()

# add two scatter plots
fig.add_scatter(fill='tozeroy')
fig.add_scatter(fill='tozeroy')

We can now poll for messages from the Kafka broker service:

def consume(consumer, topics):
    counter = 0
    x  = []   # datetime
    y1 = []   # first temp
    y2 = []   # second temp
    n  = 12   # the number of points to display on the plot
    try:
        consumer.subscribe(topics)        
        t = threading.currentThread()
        while getattr(t, "run", True):
            msg = consumer.poll(timeout=2.0)
            if msg is None:         
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    # End of partition event
                    sys.stderr.write('%% %s [%d] reached end at offset %dn' %
                                     (msg.topic(), msg.partition(), msg.offset()))
                elif msg.error():
                    raise KafkaException(msg.error())
            else:
                data = msg.value().decode("utf-8")
                x.append(data.split(',')[0])          # datetime
                y1.append(float(data.split(',')[1]))  # first temp
                y2.append(float(data.split(',')[2]))  # second temp

                #---display the last n points---                
                # first scatter plot
                fig.data[0].x = x[-n:]                # datetime
                fig.data[0].y = y1[-n:]               # temp                
                # second scatter plot
                fig.data[1].x = x[-n:]                # datetime
                fig.data[1].y = y2[-n:]               # temp
    finally:
        # Close down consumer to commit final offsets.
        consumer.close()

display(fig)

Let’s dissect the above code snippet. When a message is obtained from the broker, it is split into three parts – datetime, temp1, and temp2. They are then appended to the x, y1, and y2 lists, respectively:

data = msg.value().decode("utf-8")
                x.append(data.split(',')[0])          # datetime
                y1.append(float(data.split(',')[1]))  # first temp
                y2.append(float(data.split(',')[2]))  # second temp

As time passes, the lists would contain a lot of data. And so we only want to plot the last n points (which is set to 12 in this example). To update the first scatter plot, set the fig.data[0].x and fig.data[0].y attributes. For the second scatter plot, set the fig.data[1].x and fig.data[1].y attributes:

                #---display the last n points---                
                # first scatter plot
                fig.data[0].x = x[-n:]                # datetime
                fig.data[0].y = y1[-n:]               # temp                

                # second scatter plot
                fig.data[1].x = x[-n:]                # datetime
                fig.data[1].y = y2[-n:]               # temp

And that’s it! Whenever new messages are received, the scatter plots would automatically update themselves!

For now, when you run the code, an empty plot is shown:

Image by author
Image by author

You are now ready to spin off a thread to run the consume() function:

import threading
thread = threading.Thread(target=consume, args=(consumer, ["SensorData"]))
thread.start()

After a while, you should see the two scatter plots updating:

Image by author
Image by author

Here’s a video of the plots updating:

As usual, to step the consume() function, set the run attribute of thread to False:

thread.run = False

If you like reading my articles and that it helped your career/study, please consider signing up as a Medium member. It is $5 a month, and it gives you unlimited access to all the articles (including mine) on Medium. If you sign up using the following link, I will earn a small commission (at no additional cost to you). Your support means that I will be able to devote more time on writing articles like this.

Join Medium with my referral link – Wei-Meng Lee

Summary

This articles demonstrates one useful application that you can build with Kafka. In particular, I showed you how to build a dynamic charting application using Python and Plotly. The real-time streaming nature of Kafka makes it an ideal candidate for applications that requires low-latency data updates, such as IOT applications. Stay tuned for the next Kafka article!


Related Articles