The world’s leading publication for data science, AI, and ML professionals.

Make a mock “real-time” stream of data with Python and Kafka

Everything you need to turn a .csv file of timestamped data into a Kafka stream with Python and Docker

Make a mock "real-time" data stream with Python and Kafka

A Dockerized tutorial with everything you need to turn a .csv file of timestamped data into a Kafka stream

Abraca-Kafka! (Poorly drawn cartoon by author.)
Abraca-Kafka! (Poorly drawn cartoon by author.)

With more and more Data Science work moving towards real-time pipelines, data scientists are in need of learning to write streaming analytics. While some great, user-friendly, streaming data pipeline tools exist (my obvious favorite being Apache Kafka.) It’s hard to develop the code for a streaming analytic without having a friendly dev environment that actually produces a data stream you can test your analytics on.

A simple recipe for a real-time data stream

This post will walk through deploying a simple Python-based Kafka producer that reads from a .csv file of timestamped data, turns the data into a real-time (or, really, "back-in-time") Kafka stream, and allows you to write your own consumer for applying functions/transformations/Machine Learning models/whatever you want to the data stream.

Ingredients

All materials are available in my GitHub time-series-kafka-demo repo. To follow along, clone the repo to your local environment. You can run the example with only Docker and Docker Compose on your system.

The repo has a few different components:

Directions

Clone the repo and cd into directory.

git clone https://github.com/mtpatter/time-series-kafka-demo.git
cd time-series-kafka-demo

Start the Kafka broker and Zookeeper

The Compose file pulls Docker images for Kafka and Zookeeper version 6.2.0 from Confluent’s Docker Hub repository. (Gotta pin your versions!)

docker compose up

This starts both Kafka and Zookeeper on the same Docker network for talking to each other. The Kafka broker will be accessible on port 9092 locally, since the Compose file binds the local port to the internal image port.

Build a Docker image (optionally, for the producer and consumer)

If you’re not wanting to install the Python modules in the requirements.txt file, you can use a Docker image for the producer and consumer scripts.

From the main root directory:

docker build -t "kafkacsv" .

This command should now work:

docker run -it --rm kafkacsv python bin/sendStream.py -h

Start a consumer

We’ll start a consumer first for printing all messages in mock "real time" from the stream "my-stream". The reason why we’re starting the consumer before the producer is that the producer will reproduce all the "pauses" in time between each of the timestamped data points. If you start the consumer after the producer, the consumer will process all the messages that are already in the queue immediately. But go ahead and do that if you like. ¯(ツ)

python bin/processStream.py my-stream

or with Docker:

docker run -it --rm 
      -v $PWD:/home 
      --network=host 
      kafkacsv python bin/processStream.py my-stream

The relevant code for the main function of the consumer is below. Note that I catch the unknown topic error message and let the consumer create the new topic. Note also that the keyboard interrupt will help you shut down the consumer for this example.

Produce the time series to a data stream

If you check out the data file, it’s got two columns: a timestamp and a (randomly produced) value for data.

timestamp,value
2021-01-01 00:00:00,51
2021-01-01 00:00:04,60
2021-01-01 00:00:06,82
2021-01-01 00:00:07,86
2021-01-01 00:00:11,99
2021-01-01 00:00:12,23
2021-01-01 00:00:21,63
2021-01-01 00:00:23,20
2021-01-01 00:00:24,32

Now (in another terminal shell window) we’ll send the time series from data/data.csv to topic "my-stream." By default, the second message will be sent 4 seconds after the first, the third message will sent 2 seconds later, and so on. It’s like we’re going back in time! (Cue Huey Lewis and the News.)

python bin/sendStream.py data/data.csv my-stream

or with Docker:

docker run -it --rm 
      -v $PWD:/home 
      --network=host 
      kafkacsv python bin/sendStream.py data/data.csv my-stream

You could also speed up the stream because why not. Here’s the command for a factor of 10 increase in speed:

python bin/sendStream.py data/data.csv my-stream --speed 10

The relevant code from the main function that shows the conversion of the time series data to json is below. We do this because it will preserve the types, and, in this case, the floats in the data values will stay floats.

If you watch the output of the consumer, it will look something like this:

python bin/processStream.py my-stream
2021-08-27 15:54:44 {'2021-01-01 00:00:00': 51.0}
2021-08-27 15:54:48 {'2021-01-01 00:00:04': 60.0}
2021-08-27 15:54:50 {'2021-01-01 00:00:06': 82.0}
2021-08-27 15:54:51 {'2021-01-01 00:00:07': 86.0}
2021-08-27 15:54:55 {'2021-01-01 00:00:11': 99.0}
2021-08-27 15:54:56 {'2021-01-01 00:00:12': 23.0}
2021-08-27 15:55:05 {'2021-01-01 00:00:21': 63.0}
2021-08-27 15:55:07 {'2021-01-01 00:00:23': 20.0}
2021-08-27 15:55:08 {'2021-01-01 00:00:24': 32.0}

Check it out! This output is the result of the msg_process() function, which just prints the current time and the json data with key of timestamp and value of the numeric value. Note that the messages were received with those pauses preserved. Here’s what the msg_process() function looks like:

The actual data input (the json with key of timestamp and value of value, probably should have named that differently) is accessible in each Kafka message by calling .value().

The consumer I’ve provided here is obviously very simple since all it’s doing is printing to the screen, but you can easily swap out the msg_process() function for whatever you want to apply to each data point. I recommend using json (or Apache Avro, if you want to get fancy with schemas, etc.) for message values since, like I said, it will preserve types.

Shut down and clean up

Stop the consumer / producer with Return and Ctrl+C.

Shut down the Kafka broker system:

docker compose down

And that’s it! Hopefully, that’s all you need to get started with writing your own streaming data analytics in Python.


Related Articles