
In my previous article on Kafka, I introduced the use of Kafka for data streaming. I also showed how you can start a Kafka broker service and demonstrated how to use the Kafka producer console application to send messages and the Kafka consumer console application to receive messages.
In this article, I am going to show you how to make use of Kafka using Python. Specifically, I will:
- Use Python to send messages to a Kafka broker service
- Use Python to receive messages from a Kafka broker service
- Build a dynamic charting application to plot and update a scatter plot wherever new data is received from the broker service
Using Python with Kafka
There are at least three Python libraries available for Python developers to interface with Kafka broker services. They are:
- Kafka-Python
- PyKafka
- Confluent Kafka Python
For this article, I will make use of the Confluent Kafka Python package.
To install the Confluent Kafka Python package, use the pip
command:
!pip install confluent-kafka
Producing the Message
First, let’s work on the producer first. The producer is the one that sends messages to the Kafka broker service. The following code snippet specifies the Kafka broker server to connect to:
from confluent_kafka import Producer
import socket
conf = {
'bootstrap.servers': "localhost:9092",
'client.id': socket.gethostname()
}
producer = Producer(conf)
You need to ensure that your Kafka broker service is up and running. See https://towardsdatascience.com/using-apache-kafka-for-data-streaming-9199699623fa for more details on how to start one.
To send a message, you can use the produce()
function from the Producer
object. You can pass four arguments to it:
- the topic to send
- the key for your message payload
- the message payload
- the callback function to invoke when you poll the producer to know if the message was successfully delivered (or not).
The following code snippet sends a message to the Kafka broker service:
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))
producer.produce("SomeTopic", key="key1", value="Hello", callback=acked)
producer.poll(1) # Maximum time (1s) to block while waiting for events
The poll()
function returns the number of events processed (callbacks served).
Let’s send another message with a different key:
producer.produce("SomeTopic", key="key2", value="World", callback=acked)
producer.poll(1)
Consuming the Message
With the message sent, you can now work on the consumer. The following code snippet connects to the Kafka broker service:
from confluent_kafka import Consumer
conf = {
'bootstrap.servers': "localhost:9092",
'group.id': "1",
'auto.offset.reset': 'latest'
}
consumer = Consumer(conf)
The group.id
indicates which consumers you belong to. If there are two consumers with the same Group ID assigned to the same topic, they will all share the work of reading from the same topic.
The auto.offset.reset
indicates…specifies how a consumer should behave when consuming from a topic partition when there is no initial offset.
I will discuss the offset in another article.
To consume messages, let’s define a function named consume()
. It takes in the consumer as well as the topics to subscribe to:
from confluent_kafka import KafkaError, KafkaException
def consume(consumer, topics):
try:
consumer.subscribe(topics)
# use this as a way to stop the loop
t = threading.currentThread()
while getattr(t, "run", True):
msg = consumer.poll(timeout=5.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %dn' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
key = msg.key().decode("utf-8")
data = msg.value().decode("utf-8")
print(key, data)
finally:
# Close down consumer to commit final offsets.
consumer.close()
In this function, you first subscribe to the topic that you want to listen to. Then, you use the threading.currentThread()
function to check if an attribute named run
has been set on the current thread that is used to run this function. This is used to control whether you should continue to wait for the next message or exit the function.
We will use an infinite loop to keep on polling the Kafka broker service. The timeout
parameter allows you to set the time to block the call until a message is returned by the broker service. If you want to poll the broker at a higher frequency, set the timeout
to a lower value, such as 0.5
second.
If a message is returned, you can extract its key and value and then print them out. Finally, if the infinite loop is terminated, you close the consumer.
To run the consume()
function, we will use the threading
package:
import threading
thread = threading.Thread(target=consume,
args=(consumer, ["SomeTopic"]))
thread.start()
The start()
function runs the consume()
function as an independent thread so that it does not freeze your Jupyter Notebook.
You should now be able to see two incoming messages that were sent by the producer:
key1 Hello
key2 World
To terminate the consumer, simply set the run
attribute of thread
to False
and the consume()
function will stop running:
thread.run = False
Plotting Chart
Now that you are able to produce and consume messages using the Confluent Kafka Python package, it is time to do something useful with this newfound knowledge!
Let’s use the producer to simulate an IOT device sending sensor data to the Kafka broker service, and on the consumer end we will read the data and use it to plot a chart. As new data is received, the chart will be dynamically updated. All these will work directly on Jupyter Notebook.
Sending Sensor Data
Let’s define a function named send_message()
, which takes in four arguments:
- topic – the topic for the message
- datetime – the datetime of the sensor data collected
- temp1 – the temperature reading for sensor 1
- temp2 – the temperature reading for sensor 2
These four arguments will then be used to send the message:
def send_message(topic, datetime, temp1, temp2):
producer.produce(topic, key="key", value=f"{datetime},{temp1},{temp2}", callback=acked)
producer.poll(1)
We will also define a function named update()
that will be called every 2 seconds to call the send_message()
function with some random values:
import threading
import datetime
import random
def update():
threading.Timer(2.0, update).start() # call update() every 5s
send_message('SensorData',
datetime.datetime.utcnow(), # datetime in UTC
random.uniform(20, 39), # temperature1
random.uniform(10, 29)) # temperature2
update()
Consuming the message and plotting the chart
For the consumer, let’s create the Consumer
object:
from confluent_kafka import Consumer
conf = {
'bootstrap.servers': "localhost:9092",
'group.id': "1",
'auto.offset.reset': 'latest'
}
consumer = Consumer(conf)
Then we will make use of Plotly to add two scatter plots to a Figurewidget
:
A
FigureWidget
is a graph library that can display charts in Jupyter Notebook.
from confluent_kafka import KafkaError, KafkaException
import time
import Plotly.graph_objects as go
import numpy as np
# initialize a plot
fig = go.FigureWidget()
# add two scatter plots
fig.add_scatter(fill='tozeroy')
fig.add_scatter(fill='tozeroy')
We can now poll for messages from the Kafka broker service:
def consume(consumer, topics):
counter = 0
x = [] # datetime
y1 = [] # first temp
y2 = [] # second temp
n = 12 # the number of points to display on the plot
try:
consumer.subscribe(topics)
t = threading.currentThread()
while getattr(t, "run", True):
msg = consumer.poll(timeout=2.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %dn' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
data = msg.value().decode("utf-8")
x.append(data.split(',')[0]) # datetime
y1.append(float(data.split(',')[1])) # first temp
y2.append(float(data.split(',')[2])) # second temp
#---display the last n points---
# first scatter plot
fig.data[0].x = x[-n:] # datetime
fig.data[0].y = y1[-n:] # temp
# second scatter plot
fig.data[1].x = x[-n:] # datetime
fig.data[1].y = y2[-n:] # temp
finally:
# Close down consumer to commit final offsets.
consumer.close()
display(fig)
Let’s dissect the above code snippet. When a message is obtained from the broker, it is split into three parts – datetime
, temp1
, and temp2
. They are then appended to the x
, y1
, and y2
lists, respectively:
data = msg.value().decode("utf-8")
x.append(data.split(',')[0]) # datetime
y1.append(float(data.split(',')[1])) # first temp
y2.append(float(data.split(',')[2])) # second temp
As time passes, the lists would contain a lot of data. And so we only want to plot the last n points (which is set to 12 in this example). To update the first scatter plot, set the fig.data[0].x
and fig.data[0].y
attributes. For the second scatter plot, set the fig.data[1].x
and fig.data[1].y
attributes:
#---display the last n points---
# first scatter plot
fig.data[0].x = x[-n:] # datetime
fig.data[0].y = y1[-n:] # temp
# second scatter plot
fig.data[1].x = x[-n:] # datetime
fig.data[1].y = y2[-n:] # temp
And that’s it! Whenever new messages are received, the scatter plots would automatically update themselves!
For now, when you run the code, an empty plot is shown:

You are now ready to spin off a thread to run the consume()
function:
import threading
thread = threading.Thread(target=consume, args=(consumer, ["SensorData"]))
thread.start()
After a while, you should see the two scatter plots updating:

Here’s a video of the plots updating:
As usual, to step the consume()
function, set the run
attribute of thread
to False
:
thread.run = False
If you like reading my articles and that it helped your career/study, please consider signing up as a Medium member. It is $5 a month, and it gives you unlimited access to all the articles (including mine) on Medium. If you sign up using the following link, I will earn a small commission (at no additional cost to you). Your support means that I will be able to devote more time on writing articles like this.
Summary
This articles demonstrates one useful application that you can build with Kafka. In particular, I showed you how to build a dynamic charting application using Python and Plotly. The real-time streaming nature of Kafka makes it an ideal candidate for applications that requires low-latency data updates, such as IOT applications. Stay tuned for the next Kafka article!