Replicate Avro Messages To Target— Part1 (Conflicting Schema Exists On Target)

Is it ok to copy the topic with Replicator when the target schema registry has the same schema ID but with a different schema definition?

RanjitNagi
Towards Data Science

--

Photo by Oskar Yildiz on Unsplash

In this article, we are going to uncover how Confluent replicator will behave and what to expect when it's been asked to copy topic from source to target Kafka cluster where source and target have their own schema registries. In this scenario, we will see what happens when the same schema ID already exists somewhere on the target schema registry instance.

Before covering this scenario, there are certain knowledge and environment assumptions that I’m going to make:

Knowledge:

  1. Have a basic understanding of Kafka echo system like brokers, zookeeper, schema registry
  2. Kafka connect framework and replicator as a connector

Environment:

  1. Source and target confluent Kafka clusters are up and running which includes broker and zookeeper. I have used the Confluent 5.4 version.
  2. Source and target both have their own schema registry cluster
  3. Target cluster have Connect service running which will host replicator connector
  4. Source and target have network connectivity
  5. zookeeper-1, broker-1, schema-registry-1 is our source environment and zookeeper-2, broker-2, schema-registry-2, connect-2 makes our target environment
Image by Author, inspired by confluent. It shows source and target environments. Replicator running at target side to pull messages from the source

Practical Scenario:

Let's assume you want to replicate the “bar” topic which has Avro messages with schema id “1” embedded into these messages. On the target side, you have a separate schema-registry instance that already has schema id “1” registered but that schema is completely different.

We will cover this step by step

  1. Produce messages on source Kafka

We are going to use Avro console producer to produce Avro messages in the “bar” topic on the source Kafka cluster.

/usr/bin/kafka-avro-console-producer \
--broker-list broker-1:9092 --topic bar \
--property schema.registry.url=http://schema-registry-1:8081 \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"f2","type":"string"}]}'
{"f1":"string1","f2":"string2"}
{"f1":"string1","f2":"string2"}
{"f1":"string1","f2":"string2"}
^C

Here we are producing 3 messages with a value schema of two fields “f1” and “f2” as a string data type.

If we try to consume these messages on the source itself, we will see all these 3 messages. More importantly, we will also see these messages have schema id of 1 embedded.

/usr/bin/kafka-avro-console-consumer \
--property print.schema.ids=true \
--property schema.id.separator=: \
--property schema.registry.url=http://schema-registry-1:8081 \
--bootstrap-server broker-1:9092 --topic bar --from-beginning
{"f1":"string1","f2":"string2"}:1
{"f1":"string1","f2":"string2"}:1
{"f1":"string1","f2":"string2"}:1

“Since this is the first time we are producing on source side, therefore it will get schema id of 1. Schema IDs are monotonically increasing and will start with 1. Well the idea was to get schema id 1 on both source and target.”

2. Inspect schema ids on both source and target

Let's quickly inspect what schemas we see in our source and target schema registries.

Source:

curl -s -X GET http://schema-registry-1:8081/subjects/bar-value/versions
[1]
curl -s -X GET http://schema-registry-1:8081/subjects/bar-value/versions/1
{"subject":"bar-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"string\"}]}"}

I’m using topic name subject strategy, that's why we see a subject name as “bar-value” where “bar” is the topic name. “bar-value” subject have version 1 registered with Avro schema consisting of two fields i.e. “f1” and “f2” with string data types.

Target:

curl -s -X GET http://schema-registry-2:8081/subjects/bar-value/versions/
{"error_code":40401,"message":"Subject not found."}
curl -s -X GET http://schema-registry-2:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"firstrecord\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"phone\",\"type\":\"int\"},{\"name\":\"housenumber\",\"type\":\"int\"},{\"name\":\"distance\",\"type\":\"int\"}]}"}

On target, we don't have a subject “bar-value” but we do have a schema with an id of “1”. This schema of id “1” has four fields “age”, “phone”, “housenumber” and “distance”, all having “int” as data type, which is a completely different schema that we saw above on the source side with schema id of “1”.

3. Run replicator on target

Deploy replicator as a connector on connect cluster at target connect instance.

Posting replicator Config using ByteArrayConverter

As you might have noticed connector’s config, we are using the “ByteArrayConverter” class for the key.converter and value.converter properties. Rest are specific to “src” i.e. source and “dest” i.e. target environments.

Check connector’s status

curl -s -X GET http://connect-2:8083/connectors/bar-replicator/status | python -m json.tool
{
"connector": {
"state": "RUNNING",
"worker_id": "x.x.x.x:8083"
},
"name": "bar-replicator",
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "x.x.x.x:8083"
}
],
"type": "source"
}

4. Consume messages on target

Run Avro console consumer to consume/check messages from “bar” topic on target Kafka cluster. We will see messages on the target topic that have schema id “1” embedded.

/usr/bin/kafka-avro-console-consumer \
--property print.schema.ids=true \
--property schema.id.separator=: \
--property schema.registry.url=http://schema-registry-2:8081 \
--bootstrap-server broker-2:9092 --topic bar --from-beginning
{"age":7,"phone":-58,"housenumber":58,"distance":57}:1
{"age":7,"phone":-58,"housenumber":58,"distance":57}:1
{"age":7,"phone":-58,"housenumber":58,"distance":57}:1

This is depicted in the following diagram

Image by author

Console consumer has tried to consume all those 3 messages but those messages now have a totally different meaning. As we saw, these messages are referring to schema id of “1”. But as noted above, schema id “1” on the target schema-registry cluster has a different meaning, therefore consumer on the target cluster has failed to provide us with actual messages which were produced on the source cluster. The problem is not with the message itself but with the schema id these messages are referring to.

Now that we have learned whenever replicator is going to copy messages from source to target, it copies those messages as it is without altering/changing the schema id which is embedded into them.

Since we have used the “io.confluent.connect.replicator.util.ByteArrayConverter” class for our replicator, we can change it to use AvroConverter to see if this is going to have similar behavior or results.

5. Delete “bar” topic and replicator connector on the target environment

Let's delete the “bar” topic on the target cluster because we want to see how AvroConverter is going to work.

Delete topic

kafka-topics --delete --topic bar --zookeeper zookeeper-2:2181

Delete connector

curl -s -X DELETE http://connect-2:8083/connectors/bar-replicator

6. Deploy new connector

This time we are going to use the “io.confluent.connect.avro.AvroConverter” class for both key and value converters properties.

Deploy connector with following config (Connector have a new name and new src.consumer.group.id)

Posting replicator with Avro converter

7. Run console Avro consumer

Run Avro console consumer once again.

/usr/bin/kafka-avro-console-consumer \
--property print.schema.ids=true \
--property schema.id.separator=: \
--property schema.registry.url=http://schema-registry-2:8081 \
--bootstrap-server broker-2:9092 --topic bar --from-beginning --new
{"f1":"string1","f2":"string2"}:2
{"f1":"string1","f2":"string2"}:2
{"f1":"string1","f2":"string2"}:2

Ok, this time we see Consumer has successfully shown us the correct messages on the target “bar” topic that we were expecting to see. But more interestingly, if you notice, these messages/records are now referring to a new schema id of “2”.

Image by author

Let's check what this schema is on the target schema registry.

curl -s -X GET http://schema-registry-2:8081/schemas/ids/2
{"schema":"{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"myrecord\"}"}

The schema of id “2” seems to be the same we had on the source schema registry, or more specifically, this is the same schema that we sent along (console Avro producer) while producing messages on the source site. But apart from schema being the same, there are two extra metadata fields that we see, which are \”connect.version\”:1,\”connect.name\”:\”myrecord\”. The schema itself is the same but it's just the metadata that the replicator connector uses. We can disable this additional metadata by setting “connect.meta.data”:”false” in the Connect worker property file.

Replicator acts as a consumer that reads and de-serialize messages from source cluster based on property ”src.key.converter”/”src.value.converter” and then finally produce and serialize the messages to target kafka cluster using property “key.converter”/”value.converter”

Also if now check on target schema registry instance, it has registered a new subject i.e. “bar-value” with version 1.

curl -s -X GET http://schema-registry-2:8081/subjects/bar-value/versions
[1]

This version 1 under the “bar-value” subject is using schema id of “2”.

curl -s -X GET http://schema-registry-2:8081/subjects/bar-value/versions/1
{"subject":"bar-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"string\"}],\"connect.version\":1,\"connect.name\":\"myrecord\"}"}

Although, “AvroConverter” is a bit slower than “ByteArrayConverter” (not noticeable for low streaming data) but as we have seen, it is helping us in altering/changing the schema id of messages when it reaches to target. Using ByteArrayConverter with connector, schema id was preserved in those messages, which we didn’t want, as we have schema id “1” already exists on target and had a different schema.

There is one caveat though, let's say if schema id “1” was part of “bar-value” subject on target schema registry, then based on compatibility settings (backward is the default), replicator would have failed to insert these messages into “bar” topic on target which I have covered more in part2 of this article.

Although we would not face the above scenario with two separate schema registries, as it's always recommended to have 1 unified consolidated schema registry instance for all environments, in a situation where a consolidated schema registry is not an option or not already in place, then above step with Avro converter would help in replicating topic over and hopefully won’t break consumer on the target side. If you are interested more in single schema registry deployment, I would suggest reading the Confluent disaster recovery deployment whitepaper here

--

--