In a previous article, the components of a real-time streaming project that consumes and processes smartphone sensor data with FastAPI, Kafka, QuestDB, and Docker, were explored. This project was a first pass at implementing an architecture that can move streaming data from smartphones, through a Kafka log, and into a time series database where the data can be readily queried and processed. The end product was a dashboard that polled the database and displayed sensor readings in near real-time:
Streaming Smartphone Data with FastAPI, Kafka, QuestDB, and Docker
One criticism of the project was the introduction of unnecessary latency due to writing data from Kafka to the database, and querying the database to display the most recent sensor readings. When our primary objective is to analyze data in near real-time, writing to and reading from a database becomes inefficient.
This is one of the problems that ksqlDB was created to solve. Instead of writing data to the database and querying it for analysis, ksqlDB enables direct processing and analysis of data streams, eliminating the need to persist data in a database before accessing it.
This article will expand on the previous by introducing ksqlDB for querying and processing streaming data. Unlike traditional database polling, implementing push queries in ksqlDB significantly reduces latency in the dashboard and simplifies the backend infrastructure. All code used to build this project is available on GitHub:
GitHub – hfhoffman1144/smartphone_sensor_stream2: Stream smartphone sensor data with FastAPI…
The End Product
The objective of this project is the same as before: to develop a real-time dashboard that visualizes sensor data. However, in this iteration, our focus is on minimizing the perceptible latency between the phone and the dashboard by harnessing the power of ksqlDB. Here’s what the new dashboard should look like:

Tri-axial accelerometer data from the smartphone is sent to a FastAPI app, written to Kafka, queried with ksqlDB, and displayed in the dashboard. Notice how quickly the plot responds to the phone’s movement – the delay is nearly undetectable.
This project also supports streaming from multiple smartphones:

Project Architecture
The architecture for this project is simpler than before because QuestDB, and its consumer, are no longer required to get data to the dashboard.

Each smartphone sends sensor readings (accelerometer, gyroscope, and magnetometer) via POST request to a FastAPI application (the producer). The producer reformats the request body to a ksqlDB-compatible JSON format and asynchronously writes sensor readings to a Kafka topic. Once sensor data arrives in the Kafka topic, it can be readily queried with ksqlDB.
To obtain a continuous stream of sensor data, the client can establish a server-sent event (SSE) connection with the backend (a FastAPI application). The backend initiates a push query through a ksqlDB API that continuously sends sensor data to the frontend.
Here’s the directory for the project:
├── dashboard_backend
│ ├── Dockerfile
│ ├── app
│ │ ├── core
│ │ │ ├── config.py
│ │ │ └── utils.py
│ │ ├── db
│ │ │ └── data_api.py
│ │ ├── main.py
│ │ └── models
│ │ └── sensors.py
│ ├── entrypoint.sh
│ └── requirements.txt
├── dashboard_frontend
│ ├── Dockerfile
│ ├── app
│ │ ├── main.py
│ │ ├── static
│ │ │ └── js
│ │ │ └── main.js
│ │ └── templates
│ │ └── index.html
│ ├── entrypoint.sh
│ └── requirements.txt
├── producer
│ ├── Dockerfile
│ ├── app
│ │ ├── __init__.py
│ │ ├── core
│ │ │ ├── config.py
│ │ │ └── utils.py
│ │ ├── main.py
│ │ └── schemas
│ │ └── sensors.py
│ ├── entrypoint.sh
│ └── requirements.txt
├── docker-compose.yml
Three FastAPI applications are written to facilitate data flow and visualization— the producer, the dashboard frontend, and the dashboard backend. These apps, along with Kafak and ksqlDB, are orchestrated via docker-compose:
version: '3.8'
services:
zookeeper:
image: bitnami/zookeeper:latest
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: bitnami/kafka:latest
ports:
- 9092:9092
- 9093:9093
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
ksqldb-server:
image: confluentinc/ksqldb-server:latest
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_STREAMS_COMMIT_INTERVAL_MS: 100
KSQL_KSQL_IDLE_CONNECTION_TIMEOUT_SECONDS: 600
ksqldb-cli:
image: confluentinc/ksqldb-cli:latest
container_name: ksqldb-cli
depends_on:
- kafka
- ksqldb-server
entrypoint: /bin/sh
tty: true
producer:
build:
context: ./producer
dockerfile: Dockerfile
command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 8000
ports:
- 8000:8000
env_file:
- .env
depends_on:
- kafka
- zookeeper
dashboard_backend:
build:
context: ./dashboard_backend
dockerfile: Dockerfile
command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 5000
ports:
- 5000:5000
env_file:
- .env
depends_on:
- ksqldb-server
dashboard_frontend:
build:
context: ./dashboard_frontend
dockerfile: Dockerfile
command: uvicorn main:app --workers 1 --host 0.0.0.0 --port 4200
ports:
- 4200:4200
env_file:
- .env
depends_on:
- dashboard_backend
Notice the four services not explicitly written in code (thankfully): Zookeeper, Kafka, ksqlDB, and Kafka-CLI. These services work together with the producer and dashboard to create the project. Let’s explore these components in more detail.
The Producer
Similar to before, the producer is a FastAPI app that accepts data sent from smartphones (over POST request) and writes to a Kafka log. Here’s the directory structure:
producer
├── Dockerfile
├── app
│ ├── __init__.py
│ ├── core
│ │ ├── config.py
│ │ └── utils.py
│ ├── main.py
│ └── schemas
│ └── sensors.py
├── entrypoint.sh
└── requirements.txt
We won’t go through every file in the producer directory since everything is available on GitHub. Instead, let’s take a look at main.py
(the driving script of the producer API):
# producer/app/main.py
import json
from fastapi import FastAPI
import asyncio
from aiokafka import AIOKafkaProducer
from schemas.sensors import SensorReading, SensorResponse
from core.config import app_config
from core.utils import flatten_dict
from loguru import logger
# Instantiate FastAPI app
app = FastAPI(title=app_config.PROJECT_NAME)
# Create the event loop to use async programming
loop = asyncio.get_event_loop()
# Instatiate the Kafka producer object
producer = AIOKafkaProducer(
loop=loop,
client_id=app_config.PROJECT_NAME,
bootstrap_servers=app_config.KAFKA_URL
)
@app.on_event("startup")
async def startup_event():
await producer.start()
await producer.send(app_config.TOPIC_NAME, json.dumps({'status':'ready'}).encode("ascii"))
@app.on_event("shutdown")
async def shutdown_event():
await producer.stop()
@app.post("/phone-producer/")
async def kafka_produce(data: SensorReading):
"""
Produce a message containing readings from a smartphone sensor to Kafka.
Parameters
----------
data : SensorReading
The request body containing sensor readings and metadata.
Returns
-------
response : SensorResponse
The response body corresponding to the processed sensor readings
from the request.
"""
# Extract the messageId, deviceId, and sessionId
message_info = data.dict().copy()
message_info.pop('payload')
# Write each sensor reading in the payload to kafka
for sensor_reading in data.dict()['payload']:
kafka_message = {**flatten_dict(sensor_reading), **message_info}
await producer.send(app_config.TOPIC_NAME,
json.dumps(kafka_message).encode("ascii"))
response = SensorResponse(
messageId=data.messageId,
sessionId=data.sessionId,
deviceId=data.deviceId
)
logger.info(response)
return response
The explanation of this code is largely the same as in the previous article%3A). The main difference is that sensor readings in the request payload need to be reformatted before they are written to Kafka. This new format allows the sensor data to be queried in a Sql-like fashion by ksqlDB. Each POST request from the phone sends JSON data that looks similar to this:
{"messageId": 20,
"sessionId": "4bf3b3b9-a241-4aaa-b1d3-c05100df9976",
"deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b",
"payload": [{"name": "accelerometeruncalibrated",
"time": "1671406719721160400",
"values": {"z": -0.9372100830078125,
"y": -0.3241424560546875,
"x": 0.0323486328125}},
{"name": "magnetometeruncalibrated",
"time": "1671406719726579500",
"values": {"z": -5061.64599609375,
"y": 591.083251953125,
"x": 3500.541015625}},
{"name": "gyroscopeuncalibrated",
"time": "1671406719726173400",
"values": {"z": -0.004710599314421415,
"y": -0.013125921599566936,
"x": 0.009486978873610497}},
...
]}
Individual sensor readings are located under "payload" and are written to Kafka in the kafka_produce()
route:
# Extract the messageId, deviceId, and sessionId
message_info = data.dict().copy()
message_info.pop('payload')
# Write each sensor reading in the payload to kafka
for sensor_reading in data.dict()['payload']:
kafka_message = {**flatten_dict(sensor_reading), **message_info}
await producer.send(app_config.TOPIC_NAME,
json.dumps(kafka_message).encode("ascii"))
The flatten_dict()
function located in producer/app/core/utils.py
takes a raw sensor message in the payload, for instance:
{
"name": "accelerometeruncalibrated",
"time": "1683555956851304200",
"values": {
"z": -1.0012664794921875,
"y": -0.467315673828125,
"x": -0.00494384765625
}
}
And reformats the message to be compatible with a ksqlDB schema – this is what is written to Kafka:
{
"name": "accelerometeruncalibrated",
"time": "1683555956851304200",
"values_z": -1.0012664794921875,
"values_y": -0.467315673828125,
"values_x": -0.00494384765625,
"messageId": 35,
"sessionId": "c931f349-faf5-4e45-b09f-c623a76ef93a",
"deviceId": "86a5b0e3-6e06-40e2-b226-5a72bd39b65b"
}
Each entry in the reformatted sensor reading can be thought of as a column that can be queried by ksqlDB. More on this in the next section.
ksqlDB
At this point, sensor readings can flow from smartphones to the FastAPI producer, where they are written to Kafka in a ksqlDB-compatible format. ksqlDB can then query recent and historical data in Kafka.
ksqlDB is an open-source Streaming engine designed to process, analyze, and transform real-time data streams from Kafka using a SQL-like syntax. Put simply, ksqlDB enables interaction with data in Kafka topics using familiar relational database concepts like tables, queries, materialized views, queries, joins, and aggregations. The capabilities of ksqlDB are extensive and won’t be fully covered in this article, but the documentation provides good definitions of the core concepts:
For this project, a stream is created over the topic that stores smartphone sensor readings (the data written by the producer):
CREATE STREAM smartphone_sensor_stream (
name VARCHAR,
time BIGINT,
values_x DOUBLE,
values_y DOUBLE,
values_z DOUBLE,
messageId BIGINT,
sessionId VARCHAR,
deviceId VARCHAR
) WITH (
KAFKA_TOPIC = 'smartphone-sensor-data',
VALUE_FORMAT = 'JSON'
);
The above ksqlDB statement creates a stream, smartphone_sensor_stream
, that can be used to query sensor readings written to the smartphone-sensor-data
Kafka topic. Libraries like ksql-python can be leveraged to interface with the ksqlDB REST API and programmatically execute queries:
from ksql import KSQLAPI # pip install ksql
# Where ksqldb is running
KSQL_URL = "http://localhost:8088"
# Instantiate the ksqldb API object
client = KSQLAPI(KSQL_URL)
# Create the "smartphone_sensor_stream" stream over the specified topic
client.create_stream(table_name="smartphone_sensor_stream",
columns_type=["name varchar",
"time bigint",
"values_x double",
"values_y double",
"values_z double",
"messageId bigint",
"sessionId varchar",
"deviceId varchar"
],
topic="smartphone-sensor-data",
value_format="JSON")
A push query is executed to retrieve sensor readings as they are written to the topic. In essence, a push query opens a long-lived connection that sends updates to a client any time new data is received in the topic. This makes push queries a good choice for streaming smartphone data.
select deviceId,
time,
values_x,
values_y,
values_z
from smartphone_sensor_stream
where name = 'accelerometeruncalibrated'
emit changes
The above query "pushes" the device ID, time, and accelerometer values from smartphone_sensor_stream
every time the stream is updated with data. This can be executed with ksql-python:
from ksql import KSQLAPI
from typing import Generator
# Where ksqldb is running
KSQL_URL = "http://localhost:8088"
# Instantiate the ksqldb API object
client = KSQLAPI(KSQL_URL)
# Write a push query
push_query = '''select deviceId,
time,
values_x,
values_y,
values_z
from smartphone_sensor_stream
where name = 'accelerometeruncalibrated'
emit changes
'''
# Get the KSQL stream generator
sensor_push_stream: Generator = client.query(push_query, use_http2=True)
# Loop through messages in the generator and print them as they're received
for raw_message in sensor_push_stream:
print(raw_message)
The ksql-python client returns a generator object that yields messages as they are written to Kafka and read from the stream. Unlike a conventional for loop that iterates over a fixed-size array, this loop will continue executing as long as data is received in the stream.
The messages yielded from the query look similar to the following:
[
'86a5b0e3-6e06-40e2-b226-5a72bd39b65b', # Device ID
1684615020438850600, # Timestamp of the sensor recording
0.993927001953125, # x value
-0.5736083984375, # y value
-0.1787261962890625 # z value
]
One message is returned at a time, each of which can be thought of as a row in the smartphone_sensor_stream
ksql stream. Keep in mind that ksqlDB can perform more complex queries such as aggregates and joins, but for this project, only a basic select
is needed.
Dashboard Backend
The dashboard backend is a FastAPI app that accepts SSE requests for sensor data streams. Once an SSE connection is requested, a ksqlDB push query is opened and continuously sends messages to the frontend as they arrive in Kafka. The directory structure for the dashboard backend looks like this:
dashboard_backend
├── Dockerfile
├── app
│ ├── core
│ │ ├── config.py
│ │ └── utils.py
│ ├── db
│ │ └── data_api.py
│ ├── main.py
│ └── models
│ └── sensors.py
├── entrypoint.sh
└── requirements.txt
Let’s first take a look at data_api.py
– the interface between ksqlDB and the dashboard backend:
# data_api.py
from retry import retry
from ksql import KSQLAPI
from models.sensors import SensorName
from typing import Generator
@retry()
def create_ksql_connection(url: str) -> KSQLAPI:
"""
Create a connection to a KSQL server using the provided URL.
Parameters
----------
url : str
The URL of the KSQL server to connect to.
Returns
-------
KSQLAPI
An instance of the `KSQLAPI` class representing the connection
to the KSQL server.
"""
return KSQLAPI(url)
def create_ksql_device_stream(client: KSQLAPI,
stream_name: str,
topic_name: str) -> None:
"""
Creates a new device stream in KSQL server if it does not already exist.
Parameters:
-----------
client : KSQLAPI
A client instance of the KSQLAPI class to connect with KSQL server.
stream_name : str
The name of the device stream to create.
topic_name : str
The name of the Kafka topic to associate with the device stream.
Returns:
--------
None
Raises:
-------
KSQLServerError
If there is an error while creating the stream in KSQL server.
"""
# Get the current streams
curr_streams = client.ksql('show streams')
curr_stream_names = [stream['name'].lower()
for stream in curr_streams[0]['streams']]
# If the device stream doesn't exist, create it
if stream_name.lower() not in curr_stream_names:
client.create_stream(table_name=stream_name,
columns_type=['name varchar',
'time bigint',
'values_x double',
'values_y double',
'values_z double',
'messageId bigint',
'sessionId varchar',
'deviceId varchar'
],
topic=topic_name,
value_format='JSON')
def ksql_sensor_push(client: KSQLAPI,
stream_name: str,
sensor_name: SensorName) -> Generator:
"""
Generator function that continuously pushes sensor data
for a given sensor name from a KSQL server using the KSQL API client.
Parameters:
-----------
client : KSQLAPI
The KSQL API client instance used to query the KSQL server.
stream_name : str
The name of the KSQL stream to query data from.
sensor_name : SensorName
An enum value representing the name of the sensor to stream data for.
Returns:
--------
Generator:
A generator object that yields the sensor data as it is streamed in real-time.
"""
push_query = f'''
select deviceId,
time,
values_x,
values_y,
values_z
from {stream_name}
where name = '{sensor_name.value}'
emit changes
'''
sensor_push_stream: Generator = client.query(push_query, use_http2=True)
return sensor_push_stream
In this script, create_ksql_device_stream()
creates the ksqlDB stream defined in the previous section, and ksql_sensor_push()
returns a generator that yields the results of the sensor data push query.
Let’s break down the components of main.py
– the driving script for the dashboard backend. Here are the dependencies:
# main.py
import pandas as pd
import json
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from fastapi.requests import Request
from starlette.middleware.cors import CORSMiddleware
from core.config import app_config
from core.utils import maybe_load_json
from models.sensors import SensorName
from db.data_api import (create_ksql_connection,
create_ksql_device_stream,
ksql_sensor_push)
The primary imports to note are EventSourceResponse
(the class that implements SSE) and the functions from data_api.py
. Next a KSQLAPI object and a FastAPI app are instantiated:
# main.py
...
# Instantiate KSQLAPI object
KSQL_CLIENT = create_ksql_connection(app_config.KSQL_URL)
# Create the KSQL device stream if it doesn't exist
create_ksql_device_stream(
KSQL_CLIENT, app_config.STREAM_NAME, app_config.TOPIC_NAME
)
# Instantiate FastAPI app
app = FastAPI()
# Configure middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
Lastly, the SSE endpoint that sends sensor data from ksqlDB to the frontend is defined:
# main.py
...
# An SSE endpoint that pushes sensor data from KSQLDB to the client
@app.get("/chart-data/{client_id}")
async def message_stream(request: Request, client_id: str):
async def event_generator():
while True:
# If the client closes the connection, stop sending events
if await request.is_disconnected():
break
try:
# Get the KSQL stream generator
sensor_push_stream = ksql_sensor_push(
KSQL_CLIENT, app_config.STREAM_NAME, SensorName.ACC)
for raw_message in sensor_push_stream:
# If client closes connection, stop sending events
if await request.is_disconnected():
break
# Check if the raw message is the correct format
message = maybe_load_json(raw_message)
# If the message is in the correct format (i.e. a list),
# send to client
if isinstance(message, list):
# Format the sensor timestamp
message[1] = str(pd.to_datetime(message[1]))
# Yield the message as JSON
yield {
"event": "new_message",
"id": "message_id",
"retry": 1500000,
"data": json.dumps(message)
}
except Exception as e:
if await request.is_disconnected():
break
continue
return EventSourceResponse(event_generator())
The message_stream()
endpoint accepts GET requests that open long-lived connections through which the backend can continuously send sensor data. Within message_stream()
, a coroutine called event_generator()
is defined and returned. The purpose of event_generator()
is to create a while loop that yields sensor data messages as they are processed by ksqlDB. The only time this loop terminates is when the client closes the connection.
Dashboard Frontend
The dashboard frontend is an HTML page hosted by a FastAPI application. Its purpose is to demonstrate that data is flowing through the components of this project, and is by no means a comprehensive frontend. The interface looks like this:

When the user clicks "Start Streaming", an SSE connection is opened with the backend, and data from the push query is sent continuously. All code used to create the frontend, along with instructions on how to get everything running, is available on GitHub.
Final Thoughts
This article introduced ksqlDB as a solution to improve the real-time streaming project that was previously built using FastAPI, Kafka, QuestDB, and Docker. The project aimed to create a dashboard that visualizes sensor data in near real-time. One of the challenges faced was the unnecessary latency caused by writing data from Kafka to a database and querying the database for analysis.
ksqlDB, a database purpose-built for stream processing, was implemented to address this issue. Instead of persisting data in a database before accessing it, ksqlDB enables direct processing and analysis of data streams in Kafka. By implementing push queries in ksqlDB, the latency in the dashboard was significantly reduced, resulting in a nearly undetectable delay in displaying sensor readings.
The use of ksqlDB simplified the backend infrastructure and allowed for efficient analysis of sensor readings in near real-time. Future articles will continue making improvements on this project and enhancing its functionality with new tools and features. As always, feedback is greatly appreciated. Thanks for reading!
Become a Member: https://harrisonfhoffman.medium.com/membership
References
Apache Kafka: https://kafka.apache.org/
Event-Driven Architectures – The Queue vs The Log: https://jack-vanlightly.com/blog/2018/5/20/event-driven-architectures-the-queue-vs-the-log
_Lucidchart: https://www.lucidchart.com/_
Kafka Poc using FastApi: https://github.com/GavriloviciEduard/fastapi-kafka
geo-stream-kafka: https://github.com/iwpnd/geo-stream-kafka
18 Most Popular IoT Devices in 2022: https://www.softwaretestinghelp.com/iot-devices/#:~:text=Smart%20Mobiles%2C%20smart%20refrigerators%2C%20smartwatches,there%20by%20the%20year%202022%3F
FastAPI: https://fastapi.tiangolo.com/
QuestDB: https://questdb.io/docs/
Row vs Column Oriented Databases: https://dataschool.com/data-modeling-101/row-vs-column-oriented-databases/