
Background
Lately, I’ve been assigned a seemingly straightforward task →
"We started to capture events from a new app. Could you verify such test events landed in Kafka properly?"
I have a rough idea of how data flows in our system: clickstream data collected from both Web and mobile apps would flow to MetaRouter first, and then MetaRouter works as a Kafka producer, which produces event logs to a certain Kafka topic. The events generated from our new app have a shared writeKey
. Therefore in order to read those events, I would need to :
- Create a Kafka consumer to listen to this Kafka topic
- Since I know such test events are produced in a certain date range, I would like to build the Kafka consumer only read events for the specified dates.
- Store the data in a way that I could filter and do analysis with, ideally in a pandas dataframe.
So my goal is to achieve the data flow from Kafka -> Pandas!
After numerous days of googling around StackOverflow, GitHub, and various sites, I finally got it working! And below are my code snippets on how to implement in two of the most popular python libraries for Kafka.
Solution 1: Using kafka-python
library
Prerequisite: pip install kafka-python
(The latest version 2.0.2 is used in my notebook)
First, we need to import below libraries and Kafka environment variables. Below Setup scripts can be reused in solution 2 with one minor change.
Next, build a Kafka consumer to read events from a specific datetime range. There are 5 steps:
- Step 1: Since I know the test events were sent between 2022–09–22 between 12 pm to 14 pm (UTC) time, I use
datetime
function to createdt_start
anddt_end
to bound the time range. - Step 2: In Kafka, only events from the same partition are in order, so we need to read events from the specified partition. (say you have 6 partitions for the topic, you can pick any number from 0–5 to use as partition).
- Step 3: The basic consumer requires
topic
,bootstrap_servers
andgroup_id
. I found that in Jupyter Notebook, if I don’t providesecurity_protocol
it would throw errors. - Step 4: This is the key! The way it works is like this:
- datetime object → get converted to UTC timestamp in milliseconds → get converted to the related to offset number in a Topic Partition
- essential function is
consumer.offsets_for_times({tp:dt_start.timestamp()*1000})
- Step 5: Use
seek
to fetch events starting from the desired starting time - each message has a properties
offset
, and we compare it with the desired ending time offset to decide whether continue or break
Enough talking, and here’s the full code →
And after that, I can query thewriteKey
of our new app in Pandas! 🐼

The above solution is inspired by a similar question from StackOverflow. Actually, this is where I started to do immense search and found there’s no equivalent solution to use confluent-Kafka
. Since my original code is based on confluent-kafka
instead of kafka-python
, I was puzzled by their seeming similarity yet nuanced differences.
Now I’m happy to introduce my own solution using confluent-kafka
😃 ~~~
Solution 2: Using confluent-kafka
library
Prerequisite: pip install confluent-kafka
(The latest version 1.9.2 is used in my notebook)
Here you can use the same set-up script from Solution 1 with one minor change:
- change line 10 to
from confluent_kafka import Consumer, TopicPartition
Next, we need to build a Kafka consumer to read events from specific datetime range. On high-level, we still need the same 5 steps but the main difference is that we need to use on_assign
to achieve what seek
does – to fetch a specific offset from a Topic Partition.
Step 1: Same as solution 1, we need datetime objects to bound the search range.
Step 2: Same as solution 1. One tricky thing is that most of the time you can use string as topic such as (topic = 'analytics__pageview'
), but when you want to subscribe
, it only accepts a list such as consumer.subscribe(['analytics__pageview'])
! (As Dumbledore might say: "How perculiar~~" 🧙
Step 3: almost identical to solution 1 except for replacing =
to :
in variables assignment.
Step 4: Here’s what you’ll see the nuanced differences! We need an extra step 4c
to construct on_assign
. And this function is originally from github confluent-kafka-python issue provided by Magnus Edenhill.
Step 5: instead of using seek
, here we use subscribe
with both topic (in list form) and on_assign
to fetch the offset from the desired starting time. And we need to call close()
after fetching.
One more detail that worth noting is that to how to get the offset number.
In kafka-python
, you use offset_start[tp].offset
and offset_start
is a dictionary.
offset_start = consumer.offsets_for_times({tp:dt_start.timestamp() * 1000})
# to print out the offset number
offset_start[tp].offset
Output: (notice the { } indicating type is dict)

In confluent-kafka
, the offset_start
is a list, so you need to use offset_start[0].offset
tp_in = TopicPartition(topic=topic, partition=partition,
offset=int(dt_start.timestamp() * 1000))
offset_start = c.offsets_for_times([tp_in])
# to print out the offset number
offset_start[0].offset
Output: (notice the [ ] indicating type is list)

Alrighty, here’s the full code for implementing on confluent-kafka
→
Summary
- It is a trend to build event-driven applications, and I foresee there’s a rising need for Data Scientists to be able to quickly process and do simple exploration analysis on events data. This can help inform which data fields should be further transformed and introduced to ETL pipeline, which probably should involve
Faust
andksql
rather thanPandas.