The world’s leading publication for data science, AI, and ML professionals.

A Performant Way to Query Streaming Data

Improving a real-time streaming project with ksqlDB

North Carolina. Image by Author.
North Carolina. Image by Author.

In a previous article, the components of a real-time streaming project that consumes and processes smartphone sensor data with FastAPI, Kafka, QuestDB, and Docker, were explored. This project was a first pass at implementing an architecture that can move streaming data from smartphones, through a Kafka log, and into a time series database where the data can be readily queried and processed. The end product was a dashboard that polled the database and displayed sensor readings in near real-time:

Streaming Smartphone Data with FastAPI, Kafka, QuestDB, and Docker

One criticism of the project was the introduction of unnecessary latency due to writing data from Kafka to the database, and querying the database to display the most recent sensor readings. When our primary objective is to analyze data in near real-time, writing to and reading from a database becomes inefficient.

This is one of the problems that ksqlDB was created to solve. Instead of writing data to the database and querying it for analysis, ksqlDB enables direct processing and analysis of data streams, eliminating the need to persist data in a database before accessing it.

This article will expand on the previous by introducing ksqlDB for querying and processing streaming data. Unlike traditional database polling, implementing push queries in ksqlDB significantly reduces latency in the dashboard and simplifies the backend infrastructure. All code used to build this project is available on GitHub:

GitHub – hfhoffman1144/smartphone_sensor_stream2: Stream smartphone sensor data with FastAPI…

The End Product

The objective of this project is the same as before: to develop a real-time dashboard that visualizes sensor data. However, in this iteration, our focus is on minimizing the perceptible latency between the phone and the dashboard by harnessing the power of ksqlDB. Here’s what the new dashboard should look like:

Smartphone Accelerometer Data Queried from ksqlDB. Image by Author.
Smartphone Accelerometer Data Queried from ksqlDB. Image by Author.

Tri-axial accelerometer data from the smartphone is sent to a FastAPI app, written to Kafka, queried with ksqlDB, and displayed in the dashboard. Notice how quickly the plot responds to the phone’s movement – the delay is nearly undetectable.

This project also supports streaming from multiple smartphones:

Streaming Data from Two Smartphones Simultaneously. Image by Author.
Streaming Data from Two Smartphones Simultaneously. Image by Author.

Project Architecture

The architecture for this project is simpler than before because QuestDB, and its consumer, are no longer required to get data to the dashboard.

Project Architecture. Image by Author (made with Lucidchart).
Project Architecture. Image by Author (made with Lucidchart).

Each smartphone sends sensor readings (accelerometer, gyroscope, and magnetometer) via POST request to a FastAPI application (the producer). The producer reformats the request body to a ksqlDB-compatible JSON format and asynchronously writes sensor readings to a Kafka topic. Once sensor data arrives in the Kafka topic, it can be readily queried with ksqlDB.

To obtain a continuous stream of sensor data, the client can establish a server-sent event (SSE) connection with the backend (a FastAPI application). The backend initiates a push query through a ksqlDB API that continuously sends sensor data to the frontend.

Here’s the directory for the project:

├── dashboard_backend
│   ├── Dockerfile
│   ├── app
│   │   ├── core
│   │   │   ├── config.py
│   │   │   └── utils.py
│   │   ├── db
│   │   │   └── data_api.py
│   │   ├── main.py
│   │   └── models
│   │       └── sensors.py
│   ├── entrypoint.sh
│   └── requirements.txt
├── dashboard_frontend
│   ├── Dockerfile
│   ├── app
│   │   ├── main.py
│   │   ├── static
│   │   │   └── js
│   │   │       └── main.js
│   │   └── templates
│   │       └── index.html
│   ├── entrypoint.sh
│   └── requirements.txt
├── producer
│   ├── Dockerfile
│   ├── app
│   │   ├── __init__.py
│   │   ├── core
│   │   │   ├── config.py
│   │   │   └── utils.py
│   │   ├── main.py
│   │   └── schemas
│   │       └── sensors.py
│   ├── entrypoint.sh
│   └── requirements.txt
├── docker-compose.yml

Three FastAPI applications are written to facilitate data flow and visualization— the producer, the dashboard frontend, and the dashboard backend. These apps, along with Kafak and ksqlDB, are orchestrated via docker-compose:

version: '3.8'

services:

  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

  kafka:
    image: bitnami/kafka:latest
    ports:
      - 9092:9092
      - 9093:9093
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

  ksqldb-server:
    image: confluentinc/ksqldb-server:latest
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - kafka
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: kafka:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_STREAMS_COMMIT_INTERVAL_MS: 100
      KSQL_KSQL_IDLE_CONNECTION_TIMEOUT_SECONDS: 600

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:latest
    container_name: ksqldb-cli
    depends_on:
      - kafka
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  producer:
    build:
      context: ./producer
      dockerfile: Dockerfile
    command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 8000
    ports:
      - 8000:8000
    env_file:
      - .env
    depends_on:
      - kafka
      - zookeeper

  dashboard_backend:
    build:
      context: ./dashboard_backend
      dockerfile: Dockerfile
    command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 5000
    ports:
      - 5000:5000
    env_file:
      - .env
    depends_on:
      - ksqldb-server

  dashboard_frontend:
    build:
      context: ./dashboard_frontend
      dockerfile: Dockerfile
    command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 4200
    ports:
      - 4200:4200
    env_file:
      - .env
    depends_on:
      - dashboard_backend

Notice the four services not explicitly written in code (thankfully): Zookeeper, Kafka, ksqlDB, and Kafka-CLI. These services work together with the producer and dashboard to create the project. Let’s explore these components in more detail.

The Producer

Similar to before, the producer is a FastAPI app that accepts data sent from smartphones (over POST request) and writes to a Kafka log. Here’s the directory structure:

producer
    ├── Dockerfile
    ├── app
    │   ├── __init__.py
    │   ├── core
    │   │   ├── config.py
    │   │   └── utils.py
    │   ├── main.py
    │   └── schemas
    │       └── sensors.py
    ├── entrypoint.sh
    └── requirements.txt

We won’t go through every file in the producer directory since everything is available on GitHub. Instead, let’s take a look at main.py (the driving script of the producer API):

# producer/app/main.py
import json
from fastapi import FastAPI
import asyncio
from aiokafka import AIOKafkaProducer
from schemas.sensors import SensorReading, SensorResponse
from core.config import app_config
from core.utils import flatten_dict
from loguru import logger

# Instantiate FastAPI app
app = FastAPI(title=app_config.PROJECT_NAME)

# Create the event loop to use async programming
loop = asyncio.get_event_loop()

# Instatiate the Kafka producer object
producer = AIOKafkaProducer(
    loop=loop,
    client_id=app_config.PROJECT_NAME,
    bootstrap_servers=app_config.KAFKA_URL
)

@app.on_event("startup")
async def startup_event():

    await producer.start()
    await producer.send(app_config.TOPIC_NAME, json.dumps({'status':'ready'}).encode("ascii"))

@app.on_event("shutdown")
async def shutdown_event():
    await producer.stop()

@app.post("/phone-producer/")
async def kafka_produce(data: SensorReading):

    """
    Produce a message containing readings from a smartphone sensor to Kafka.

    Parameters
    ----------
    data : SensorReading
        The request body containing sensor readings and metadata.

    Returns
    -------
    response : SensorResponse
        The response body corresponding to the processed sensor readings
        from the request.
    """

    # Extract the messageId, deviceId, and sessionId
    message_info = data.dict().copy()
    message_info.pop('payload')

    # Write each sensor reading in the payload to kafka
    for sensor_reading in data.dict()['payload']:

        kafka_message = {**flatten_dict(sensor_reading), **message_info}
        await producer.send(app_config.TOPIC_NAME,
                            json.dumps(kafka_message).encode("ascii"))

    response = SensorResponse(
        messageId=data.messageId,
        sessionId=data.sessionId,
        deviceId=data.deviceId
    )

    logger.info(response)

    return response

The explanation of this code is largely the same as in the previous article%3A). The main difference is that sensor readings in the request payload need to be reformatted before they are written to Kafka. This new format allows the sensor data to be queried in a Sql-like fashion by ksqlDB. Each POST request from the phone sends JSON data that looks similar to this:

{"messageId": 20,
 "sessionId": "4bf3b3b9-a241-4aaa-b1d3-c05100df9976",
 "deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b",
 "payload": [{"name": "accelerometeruncalibrated",
              "time": "1671406719721160400",
              "values": {"z": -0.9372100830078125,
                         "y": -0.3241424560546875, 
                         "x": 0.0323486328125}},
             {"name": "magnetometeruncalibrated",
              "time": "1671406719726579500",
              "values": {"z": -5061.64599609375,
                         "y": 591.083251953125,
                         "x": 3500.541015625}},
             {"name": "gyroscopeuncalibrated",
              "time": "1671406719726173400",
              "values": {"z": -0.004710599314421415,
                         "y": -0.013125921599566936,
                         "x": 0.009486978873610497}}, 
...
]}

Individual sensor readings are located under "payload" and are written to Kafka in the kafka_produce() route:

# Extract the messageId, deviceId, and sessionId
message_info = data.dict().copy()
message_info.pop('payload')

# Write each sensor reading in the payload to kafka
for sensor_reading in data.dict()['payload']:

    kafka_message = {**flatten_dict(sensor_reading), **message_info}
    await producer.send(app_config.TOPIC_NAME,
                        json.dumps(kafka_message).encode("ascii"))

The flatten_dict() function located in producer/app/core/utils.py takes a raw sensor message in the payload, for instance:

{
  "name": "accelerometeruncalibrated",
  "time": "1683555956851304200",
  "values": {
    "z": -1.0012664794921875,
    "y": -0.467315673828125,
    "x": -0.00494384765625
  }
}

And reformats the message to be compatible with a ksqlDB schema – this is what is written to Kafka:

{
  "name": "accelerometeruncalibrated",
  "time": "1683555956851304200",
  "values_z": -1.0012664794921875,
  "values_y": -0.467315673828125,
  "values_x": -0.00494384765625,
  "messageId": 35,
  "sessionId": "c931f349-faf5-4e45-b09f-c623a76ef93a",
  "deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b"
}

Each entry in the reformatted sensor reading can be thought of as a column that can be queried by ksqlDB. More on this in the next section.

ksqlDB

At this point, sensor readings can flow from smartphones to the FastAPI producer, where they are written to Kafka in a ksqlDB-compatible format. ksqlDB can then query recent and historical data in Kafka.

ksqlDB is an open-source Streaming engine designed to process, analyze, and transform real-time data streams from Kafka using a SQL-like syntax. Put simply, ksqlDB enables interaction with data in Kafka topics using familiar relational database concepts like tables, queries, materialized views, queries, joins, and aggregations. The capabilities of ksqlDB are extensive and won’t be fully covered in this article, but the documentation provides good definitions of the core concepts:

ksqlDB Concepts – ksqlDB Documentation

For this project, a stream is created over the topic that stores smartphone sensor readings (the data written by the producer):

CREATE STREAM smartphone_sensor_stream (
    name VARCHAR,
    time BIGINT,
    values_x DOUBLE,
    values_y DOUBLE,
    values_z DOUBLE,
    messageId BIGINT,
    sessionId VARCHAR,
    deviceId VARCHAR
  ) WITH (
    KAFKA_TOPIC = 'smartphone-sensor-data',
    VALUE_FORMAT = 'JSON'
  );

The above ksqlDB statement creates a stream, smartphone_sensor_stream, that can be used to query sensor readings written to the smartphone-sensor-data Kafka topic. Libraries like ksql-python can be leveraged to interface with the ksqlDB REST API and programmatically execute queries:

from ksql import KSQLAPI # pip install ksql

# Where ksqldb is running
KSQL_URL = "http://localhost:8088" 

# Instantiate the ksqldb API object
client = KSQLAPI(KSQL_URL)

# Create the "smartphone_sensor_stream" stream over the specified topic
client.create_stream(table_name="smartphone_sensor_stream",
                     columns_type=["name varchar",
                                   "time bigint",
                                   "values_x double",
                                   "values_y double",
                                   "values_z double",
                                   "messageId bigint",
                                   "sessionId varchar",
                                   "deviceId varchar"
                                   ],
                     topic="smartphone-sensor-data",
                     value_format="JSON") 

A push query is executed to retrieve sensor readings as they are written to the topic. In essence, a push query opens a long-lived connection that sends updates to a client any time new data is received in the topic. This makes push queries a good choice for streaming smartphone data.

select deviceId,
       time,
       values_x,
       values_y,
       values_z
from smartphone_sensor_stream
where name = 'accelerometeruncalibrated'
emit changes

The above query "pushes" the device ID, time, and accelerometer values from smartphone_sensor_stream every time the stream is updated with data. This can be executed with ksql-python:

from ksql import KSQLAPI
from typing import Generator

# Where ksqldb is running
KSQL_URL = "http://localhost:8088" 

# Instantiate the ksqldb API object
client = KSQLAPI(KSQL_URL)

# Write a push query 
push_query = '''select deviceId,
                        time,
                        values_x,
                        values_y,
                        values_z
                    from smartphone_sensor_stream
                    where name = 'accelerometeruncalibrated'
                    emit changes
                   '''
# Get the KSQL stream generator
sensor_push_stream: Generator = client.query(push_query, use_http2=True)

# Loop through messages in the generator and print them as they're received
for raw_message in sensor_push_stream:

    print(raw_message) 

The ksql-python client returns a generator object that yields messages as they are written to Kafka and read from the stream. Unlike a conventional for loop that iterates over a fixed-size array, this loop will continue executing as long as data is received in the stream.

The messages yielded from the query look similar to the following:

[
 '86a5b0e3-6e06-40e2-b226-5a72bd39b65b', # Device ID
 1684615020438850600, # Timestamp of the sensor recording
 0.993927001953125, # x value
-0.5736083984375, # y value
-0.1787261962890625 # z value
]

One message is returned at a time, each of which can be thought of as a row in the smartphone_sensor_streamksql stream. Keep in mind that ksqlDB can perform more complex queries such as aggregates and joins, but for this project, only a basic selectis needed.

Dashboard Backend

The dashboard backend is a FastAPI app that accepts SSE requests for sensor data streams. Once an SSE connection is requested, a ksqlDB push query is opened and continuously sends messages to the frontend as they arrive in Kafka. The directory structure for the dashboard backend looks like this:

 dashboard_backend
   ├── Dockerfile
   ├── app
   │   ├── core
   │   │   ├── config.py
   │   │   └── utils.py
   │   ├── db
   │   │   └── data_api.py
   │   ├── main.py
   │   └── models
   │       └── sensors.py
   ├── entrypoint.sh
   └── requirements.txt

Let’s first take a look at data_api.py – the interface between ksqlDB and the dashboard backend:

# data_api.py

from retry import retry
from ksql import KSQLAPI
from models.sensors import SensorName
from typing import Generator

@retry()
def create_ksql_connection(url: str) -> KSQLAPI:
    """
    Create a connection to a KSQL server using the provided URL.

    Parameters
    ----------
    url : str
        The URL of the KSQL server to connect to.

    Returns
    -------
    KSQLAPI
        An instance of the `KSQLAPI` class representing the connection
        to the KSQL server.
    """

    return KSQLAPI(url)

def create_ksql_device_stream(client: KSQLAPI,
                              stream_name: str,
                              topic_name: str) -> None:
    """
    Creates a new device stream in KSQL server if it does not already exist.

    Parameters:
    -----------
    client : KSQLAPI
        A client instance of the KSQLAPI class to connect with KSQL server.
    stream_name : str
        The name of the device stream to create.
    topic_name : str
        The name of the Kafka topic to associate with the device stream.

    Returns:
    --------
    None

    Raises:
    -------
    KSQLServerError
        If there is an error while creating the stream in KSQL server.
    """

    # Get the current streams
    curr_streams = client.ksql('show streams')
    curr_stream_names = [stream['name'].lower()
                         for stream in curr_streams[0]['streams']]

    # If the device stream doesn't exist, create it
    if stream_name.lower() not in curr_stream_names:

        client.create_stream(table_name=stream_name,
                             columns_type=['name varchar',
                                           'time bigint',
                                           'values_x double',
                                           'values_y double',
                                           'values_z double',
                                           'messageId bigint',
                                           'sessionId varchar',
                                           'deviceId varchar'
                                           ],
                             topic=topic_name,
                             value_format='JSON')

def ksql_sensor_push(client: KSQLAPI,
                     stream_name: str,
                     sensor_name: SensorName) -> Generator:
    """
    Generator function that continuously pushes sensor data
    for a given sensor name from a KSQL server using the KSQL API client.

    Parameters:
    -----------
    client : KSQLAPI
        The KSQL API client instance used to query the KSQL server.
    stream_name : str
        The name of the KSQL stream to query data from.
    sensor_name : SensorName
        An enum value representing the name of the sensor to stream data for.

    Returns:
    --------
    Generator:
        A generator object that yields the sensor data as it is streamed in real-time.
    """

    push_query = f'''
                  select deviceId,
                         time,
                         values_x,
                         values_y,
                         values_z
                    from {stream_name}
                    where name = '{sensor_name.value}'
                    emit changes
                   '''

    sensor_push_stream: Generator = client.query(push_query, use_http2=True)

    return sensor_push_stream

In this script, create_ksql_device_stream() creates the ksqlDB stream defined in the previous section, and ksql_sensor_push() returns a generator that yields the results of the sensor data push query.

Let’s break down the components of main.py – the driving script for the dashboard backend. Here are the dependencies:

# main.py

import pandas as pd
import json
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from fastapi.requests import Request
from starlette.middleware.cors import CORSMiddleware
from core.config import app_config
from core.utils import maybe_load_json
from models.sensors import SensorName
from db.data_api import (create_ksql_connection,
                         create_ksql_device_stream,
                         ksql_sensor_push)

The primary imports to note are EventSourceResponse (the class that implements SSE) and the functions from data_api.py. Next a KSQLAPI object and a FastAPI app are instantiated:

# main.py

...

# Instantiate KSQLAPI object
KSQL_CLIENT = create_ksql_connection(app_config.KSQL_URL)

# Create the KSQL device stream if it doesn't exist
create_ksql_device_stream(
  KSQL_CLIENT, app_config.STREAM_NAME, app_config.TOPIC_NAME
)

# Instantiate FastAPI app
app = FastAPI()

# Configure middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Lastly, the SSE endpoint that sends sensor data from ksqlDB to the frontend is defined:

# main.py

...

# An SSE endpoint that pushes sensor data from KSQLDB to the client
@app.get("/chart-data/{client_id}")
async def message_stream(request: Request, client_id: str):
    async def event_generator():
        while True:
            # If the client closes the connection, stop sending events
            if await request.is_disconnected():
                break

            try:

                # Get the KSQL stream generator
                sensor_push_stream = ksql_sensor_push(
                    KSQL_CLIENT, app_config.STREAM_NAME, SensorName.ACC)

                for raw_message in sensor_push_stream:

                    # If client closes connection, stop sending events
                    if await request.is_disconnected():
                        break

                    # Check if the raw message is the correct format
                    message = maybe_load_json(raw_message)

                    # If the message is in the correct format (i.e. a list),
                    # send to client
                    if isinstance(message, list):

                        # Format the sensor timestamp
                        message[1] = str(pd.to_datetime(message[1]))

                        # Yield the message as JSON
                        yield {
                            "event": "new_message",
                            "id": "message_id",
                            "retry": 1500000,
                            "data": json.dumps(message)
                        }
            except Exception as e:

                if await request.is_disconnected():

                    break

                continue

    return EventSourceResponse(event_generator())

The message_stream() endpoint accepts GET requests that open long-lived connections through which the backend can continuously send sensor data. Within message_stream(), a coroutine called event_generator() is defined and returned. The purpose of event_generator()is to create a while loop that yields sensor data messages as they are processed by ksqlDB. The only time this loop terminates is when the client closes the connection.

Dashboard Frontend

The dashboard frontend is an HTML page hosted by a FastAPI application. Its purpose is to demonstrate that data is flowing through the components of this project, and is by no means a comprehensive frontend. The interface looks like this:

Displaying Smartphone Sensor Data Streams on the Dashboard. Image by Author.
Displaying Smartphone Sensor Data Streams on the Dashboard. Image by Author.

When the user clicks "Start Streaming", an SSE connection is opened with the backend, and data from the push query is sent continuously. All code used to create the frontend, along with instructions on how to get everything running, is available on GitHub.

Final Thoughts

This article introduced ksqlDB as a solution to improve the real-time streaming project that was previously built using FastAPI, Kafka, QuestDB, and Docker. The project aimed to create a dashboard that visualizes sensor data in near real-time. One of the challenges faced was the unnecessary latency caused by writing data from Kafka to a database and querying the database for analysis.

ksqlDB, a database purpose-built for stream processing, was implemented to address this issue. Instead of persisting data in a database before accessing it, ksqlDB enables direct processing and analysis of data streams in Kafka. By implementing push queries in ksqlDB, the latency in the dashboard was significantly reduced, resulting in a nearly undetectable delay in displaying sensor readings.

The use of ksqlDB simplified the backend infrastructure and allowed for efficient analysis of sensor readings in near real-time. Future articles will continue making improvements on this project and enhancing its functionality with new tools and features. As always, feedback is greatly appreciated. Thanks for reading!

Become a Member: https://harrisonfhoffman.medium.com/membership

References

Apache Kafka: https://kafka.apache.org/

Event-Driven Architectures – The Queue vs The Log: https://jack-vanlightly.com/blog/2018/5/20/event-driven-architectures-the-queue-vs-the-log

_Lucidchart: https://www.lucidchart.com/_

Kafka Poc using FastApi: https://github.com/GavriloviciEduard/fastapi-kafka

geo-stream-kafka: https://github.com/iwpnd/geo-stream-kafka

18 Most Popular IoT Devices in 2022: https://www.softwaretestinghelp.com/iot-devices/#:~:text=Smart%20Mobiles%2C%20smart%20refrigerators%2C%20smartwatches,there%20by%20the%20year%202022%3F

FastAPI: https://fastapi.tiangolo.com/

QuestDB: https://questdb.io/docs/

Row vs Column Oriented Databases: https://dataschool.com/data-modeling-101/row-vs-column-oriented-databases/

ksqlDB: https://docs.ksqldb.io/en/latest/


Related Articles