The world’s leading publication for data science, AI, and ML professionals.

How to Read Kafka Clickstream Event Data in Pandas

Build Kafka consumer to read event data with a specific date range in Jupyter notebook

Photo by Jonatan Pie on Unsplash
Photo by Jonatan Pie on Unsplash

Background

Lately, I’ve been assigned a seemingly straightforward task →

"We started to capture events from a new app. Could you verify such test events landed in Kafka properly?"

I have a rough idea of how data flows in our system: clickstream data collected from both Web and mobile apps would flow to MetaRouter first, and then MetaRouter works as a Kafka producer, which produces event logs to a certain Kafka topic. The events generated from our new app have a shared writeKey . Therefore in order to read those events, I would need to :

  1. Create a Kafka consumer to listen to this Kafka topic
  2. Since I know such test events are produced in a certain date range, I would like to build the Kafka consumer only read events for the specified dates.
  3. Store the data in a way that I could filter and do analysis with, ideally in a pandas dataframe.

So my goal is to achieve the data flow from Kafka -> Pandas!

After numerous days of googling around StackOverflow, GitHub, and various sites, I finally got it working! And below are my code snippets on how to implement in two of the most popular python libraries for Kafka.


Solution 1: Using kafka-python library

Prerequisite: pip install kafka-python (The latest version 2.0.2 is used in my notebook)

First, we need to import below libraries and Kafka environment variables. Below Setup scripts can be reused in solution 2 with one minor change.

Next, build a Kafka consumer to read events from a specific datetime range. There are 5 steps:

  • Step 1: Since I know the test events were sent between 2022–09–22 between 12 pm to 14 pm (UTC) time, I use datetime function to create dt_start and dt_end to bound the time range.
  • Step 2: In Kafka, only events from the same partition are in order, so we need to read events from the specified partition. (say you have 6 partitions for the topic, you can pick any number from 0–5 to use as partition).
  • Step 3: The basic consumer requires topic , bootstrap_servers and group_id . I found that in Jupyter Notebook, if I don’t provide security_protocol it would throw errors.
  • Step 4: This is the key! The way it works is like this:
  • datetime object → get converted to UTC timestamp in milliseconds → get converted to the related to offset number in a Topic Partition
  • essential function is consumer.offsets_for_times({tp:dt_start.timestamp()*1000})
  • Step 5: Use seek to fetch events starting from the desired starting time
  • each message has a properties offset , and we compare it with the desired ending time offset to decide whether continue or break

Enough talking, and here’s the full code →

And after that, I can query thewriteKey of our new app in Pandas! 🐼

The above solution is inspired by a similar question from StackOverflow. Actually, this is where I started to do immense search and found there’s no equivalent solution to use confluent-Kafka . Since my original code is based on confluent-kafka instead of kafka-python , I was puzzled by their seeming similarity yet nuanced differences.

Now I’m happy to introduce my own solution using confluent-kafka😃 ~~~


Solution 2: Using confluent-kafka library

Prerequisite: pip install confluent-kafka (The latest version 1.9.2 is used in my notebook)

Here you can use the same set-up script from Solution 1 with one minor change:

  • change line 10 to from confluent_kafka import Consumer, TopicPartition

Next, we need to build a Kafka consumer to read events from specific datetime range. On high-level, we still need the same 5 steps but the main difference is that we need to use on_assign to achieve what seek does – to fetch a specific offset from a Topic Partition.

Step 1: Same as solution 1, we need datetime objects to bound the search range.

Step 2: Same as solution 1. One tricky thing is that most of the time you can use string as topic such as (topic = 'analytics__pageview' ), but when you want to subscribe , it only accepts a list such as consumer.subscribe(['analytics__pageview']) ! (As Dumbledore might say: "How perculiar~~" 🧙

Step 3: almost identical to solution 1 except for replacing = to : in variables assignment.

Step 4: Here’s what you’ll see the nuanced differences! We need an extra step 4c to construct on_assign . And this function is originally from github confluent-kafka-python issue provided by Magnus Edenhill.

Step 5: instead of using seek , here we use subscribe with both topic (in list form) and on_assign to fetch the offset from the desired starting time. And we need to call close() after fetching.

One more detail that worth noting is that to how to get the offset number.

In kafka-python , you use offset_start[tp].offset and offset_start is a dictionary.

offset_start = consumer.offsets_for_times({tp:dt_start.timestamp() * 1000})
# to print out the offset number
offset_start[tp].offset

Output: (notice the { } indicating type is dict)

In confluent-kafka , the offset_start is a list, so you need to use offset_start[0].offset

tp_in = TopicPartition(topic=topic, partition=partition, 
                       offset=int(dt_start.timestamp() * 1000))
offset_start = c.offsets_for_times([tp_in])
# to print out the offset number
offset_start[0].offset

Output: (notice the [ ] indicating type is list)

Alrighty, here’s the full code for implementing on confluent-kafka

Summary

  • It is a trend to build event-driven applications, and I foresee there’s a rising need for Data Scientists to be able to quickly process and do simple exploration analysis on events data. This can help inform which data fields should be further transformed and introduced to ETL pipeline, which probably should involve Faust and ksql rather than Pandas.

Related Articles