Programming

Dead Letter Queue (DLQ) in Kafka

Introduction to Kafka DLQ and its implementation in Python

Jimit Dholakia
Towards Data Science
2 min readJan 28, 2021

--

Image by DaKub from Pixabay

Dead Letter Queue is a secondary Kafka topic that receives the messages which the Kafka Consumer failed to process due to certain errors like improper deserialization of messages, improper message format, etc.

Image by the author (Jimit Dholakia)

Installation

There are various libraries in Python that can be used to connect to Kafka Cluster. Some of them are:

  1. kafka-python
  2. confluent-kafka
  3. PyKafka

I’ll be using kafka-python to connect to Kafka Cluster and to create Kafka Producer and Consumer Clients.

Install kafka-python using pip:

pip install kafka-python

Implementation

We will first import the necessary packages and define the bootstrap servers, primary topic name, and DLQ topic name to create instances of Kafka Producer and Consumer.

from kafka import KafkaProducer, KafkaConsumer
import json
bootstrap_servers = ['localhost:9092']
primary_topic = 'primary-topic-name'
dlq_topic = 'dlq-topic-name'

Now, let's create a Producer for the DLQ Topic, where the malformed messages will be sent.

dlq_producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda x: x.encode('utf-8'),
acks='all'
)

Next, we will create Kafka Consumer to consume the messages from the primary topic.

consumer = KafkaConsumer(
primary_topic,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='latest',
enable_auto_commit=True,
value_deserializer=lambda x: x.decode('utf-8')
)

Now, we will enclose our code in the try-except block. If the message received is not in JSON format or any exception occurs, then the message will be sent to the DLQ topic.

for msg in consumer:
print(f'\nReceived:\nPartition: {msg.partition} \tOffset: {msg.offset}\tValue: {msg.value}')

try:
data = json.loads(msg.value)
print('Data Received:', data)

except:
print(f'Value {msg.value} not in JSON format')
dlq_producer.send(dlq_topic, value=msg.value)
print('Message sent to DLQ Topic')

Conclusion

Creating a DLQ topic helps to identify the malformed messages without disturbing other messages. DLQ topic also helps us to analyze the malformed messages and can be used for reporting purposes.

Resources

The code snippets of this article are available on my GitHub page.

Reference

Let’s Connect

LinkedIn: https://www.linkedin.com/in/jimit105/
GitHub: https://github.com/jimit105
Twitter: https://twitter.com/jimit105

--

--