Programming
Dead Letter Queue (DLQ) in Kafka
Introduction to Kafka DLQ and its implementation in Python
Dead Letter Queue is a secondary Kafka topic that receives the messages which the Kafka Consumer failed to process due to certain errors like improper deserialization of messages, improper message format, etc.
Installation
There are various libraries in Python that can be used to connect to Kafka Cluster. Some of them are:
I’ll be using kafka-python to connect to Kafka Cluster and to create Kafka Producer and Consumer Clients.
Install kafka-python using pip:
pip install kafka-python
Implementation
We will first import the necessary packages and define the bootstrap servers, primary topic name, and DLQ topic name to create instances of Kafka Producer and Consumer.
from kafka import KafkaProducer, KafkaConsumer
import json
bootstrap_servers = ['localhost:9092']
primary_topic = 'primary-topic-name'
dlq_topic = 'dlq-topic-name'
Now, let's create a Producer for the DLQ Topic, where the malformed messages will be sent.
dlq_producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda x: x.encode('utf-8'),
acks='all'
)
Next, we will create Kafka Consumer to consume the messages from the primary topic.
consumer = KafkaConsumer(
primary_topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='latest',
enable_auto_commit=True,
value_deserializer=lambda x: x.decode('utf-8')
)
Now, we will enclose our code in the try-except block. If the message received is not in JSON format or any exception occurs, then the message will be sent to the DLQ topic.
for msg in consumer:
print(f'\nReceived:\nPartition: {msg.partition} \tOffset: {msg.offset}\tValue: {msg.value}')
try:
data = json.loads(msg.value)
print('Data Received:', data)
except:
print(f'Value {msg.value} not in JSON format')
dlq_producer.send(dlq_topic, value=msg.value)
print('Message sent to DLQ Topic')
Conclusion
Creating a DLQ topic helps to identify the malformed messages without disturbing other messages. DLQ topic also helps us to analyze the malformed messages and can be used for reporting purposes.
Resources
The code snippets of this article are available on my GitHub page.
Reference
Let’s Connect
LinkedIn: https://www.linkedin.com/in/jimit105/
GitHub: https://github.com/jimit105
Twitter: https://twitter.com/jimit105