The world’s leading publication for data science, AI, and ML professionals.

Replicate Avro Messages To Target - Part2 (Conflicting Schema Exists On Target, Same Subject)

What to do when Replicator fails to insert messages to the target topic because the topic's subject has a version with conflicting schema?

Replicate Avro Messages To Target

Part 2 (Conflicting Schema Exists On Target, Same Subject)


This is a follow-up article based on this, where we discussed what to expect when replicator tries to copy topic with Avro messages to target but target schema registry already have same schema ID (which is embedded into messages) residing in different subject and that schema object is completely different with what it had on the source. In this article, we are going to see how Confluent Replicator is going to behave when a subject with the same name already exists on the target schema registry that has a version with a conflicting schema.

Before going forward with this article, the reader’s knowledge of Broker, Connect, Schema registry, and Confluent replicator is required.

Following is the environment that we have on source and target.

Practical Scenario:

Let’s assume you want to replicate the "bar" topic which has Avro messages with schema id of "1". On the target side, you have a separate schema-registry instance that already has a subject i.e. "bar-value" (topic name strategy) with version 1 registered and this version is conflicting with what you had on the source schema registry.

Below are the steps to reproduce this issue where the Confluent Replicator will fail inserting the messages to the target topic. Later on, I will discuss what workaround we can implement to remedy this.

  1. Produce messages on source Kafka

Use Avro console producer to produce Avro messages in "bar" topic on source Kafka cluster.

/usr/bin/kafka-avro-console-producer 
--broker-list broker-1:9092 --topic bar 
--property Schema.registry.url=http://schema-registry-1:8081 
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"f2","type":"string"}]}'
{"f1":"string1","f2":"string2"}
{"f1":"string1","f2":"string2"}
{"f1":"string1","f2":"string2"}
^C

Take a note of the schema that we sent along which consists of "f1" & "f2" fields with a string data type. I have auto schema registered enabled, which means if there isn’t a "bar-value" subject that already exists on the schema registry, it will first create the subject (with topic name strategy) and then will create its first version. There are other subject naming strategies available like record name strategy and topic record name strategy but I’m using topic name strategy to keep it simple. If you are interested to know more, refer to this document.

Let’s check the subject which got created above in the schema registry of the source environment.

curl -s -X GET http://schema-registry-1:8081/subjects/bar-value/versions
[1]
curl -s -X GET http://schema-registry-1:8081/subjects/bar-value/versions/1
{"subject":"bar-value","version":1,"id":1,"schema":"{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"f2","type":"string"}]}"}

2. Inspect the schema on the target schema registry

As talked about in the scenario above, the topic’s subject already exists on the target schema registry and has a version that is a conflicting schema.

curl -s -X GET http://schema-registry-2:8081/subjects/bar-value/versions
[1]
curl -s -X GET http://schema-registry-2:8081/subjects/bar-value/versions/1
{"subject":"bar-value","version":1,"id":1,"schema":"{"type":"record","name":"firstrecord","fields":[{"name":"age","type":"int"},{"name":"phone","type":"int"},{"name":"housenumber","type":"int"},{"name":"distance","type":"int"}]}"}

This schema on target under the "bar-value" subject has four fields and all of them have "int" as a type. This makes a completely different from what we had on the source schema registry. This schema uses 4 fields of all "int" data types as opposed to 2 fields with "string" data types on the source schema. This makes both schemas incompatible to each other, that we are going to see soon when replicator tries to serialize the message and fails to insert/register a schema on target. Check out this doc to find out about the compatibility settings that schema registry offers.

3. Run replicator on target

Deploy replicator as a connector on connect instance on the target environment. I’m going to use the following configs for it.

If now check the status of connector, we will see its failed with error "io.confluent.kafka.Schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409"

This is depicted in the following diagram

With the "AvroConverter" class that we have used with property "value.converter", the replicator is trying to serialize the data while inserting and also tries to register the schema as a new version under "bar-value" subject because this schema doesn’t exist already. The schema registry will deny injecting this new version of schema under subject because it runs schema compatibility checks against what is being posted with the "latest" version of the subject (i.e. "int" data type that we saw earlier) since this newly posted schema was not compatible (explained above) and therefore replicator will error out with an above error.

There are few workarounds that can help to get through this. I will cover some

Workarounds:

Use "topic.rename.format" with replicator config: With this additional property in the replicator connector’s config, you are going to get a new subject on the target schema registry and the replicator will be able to successfully create its first version under it. Let’s check this

Delete the existing connector if it still exists

curl -s -X DELETE http://connect-2:8083/connectors/bar-avro-replicator

Redeploy connector with "topic.rename.format"

Status should show "RUNNING"

curl -s -X GET http://connect-2:8083/connectors   
{
    "connector": {
        "state": "RUNNING",
        "worker_id": "x.x.x.x:8083"
    },
    "name": "bar-avro-replicator",
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "x.x.x.x:8083"
        }
    ],
    "type": "source"
}

Let’s check if the replicator has created a new subject with "-repica" suffix

curl -s -X GET http://schema-registry-2:8081/subjects/
["bar-value","bar-replica-value"]

Yes, "bar-replica-value" got created as it follows the topic name strategy. Now let’s inspect the version under this subject

curl -s -X GET http://schema-registry-2:8081/subjects/bar-replica-value/versions
[1]
curl -s -X GET http://schema-registry-2:8081/subjects/bar-replica-value/versions/1
{"subject":"bar-replica-value","version":1,"id":2,"schema":"{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"f2","type":"string"}],"connect.version":1,"connect.name":"myrecord"}"}

And there you have it, replicator has successfully created this schema, please note the schema ID which this version refers to i.e. "2". Now it’s time to consume our messages from the "bar-replica" topic on target Kafka.

/usr/bin/kafka-avro-console-consumer 
--property print.schema.ids=true 
--property schema.id.separator=: 
--property schema.registry.url=http://schema-registry-2:8081 
--bootstrap-server broker-2:9092 --topic bar-replica --from-beginning
{"f1":"string1","f2":"string2"}:2
{"f1":"string1","f2":"string2"}:2
{"f1":"string1","f2":"string2"}:2

The consumer also runs and successfully de-serialized the message by doing a lookup up for schema ID "2". If you need more information about why we see schema ID of "2" instead of "1", then check out this article which has more details.

Delete version on target schema registry: An easy option would be (but not always an option) is to delete the version "1" (which is conflicting) of the "bar-value" subject on the target schema registry.

curl -s -X DELETE http://schema-registry-2:8081/subjects/bar-value/versions/1     
1

Set "NONE" compatibility setting for "bar-value" subject: Again, this could be easily done as well, just change or update the compatibility setting of "bar-value" subject to NONE and the replicator should be able to insert the new schema and proceed with replicating the message.

Get the current config settings

curl -s -X GET http://schema-registry-2:8081/config/bar-value
{"error_code":40401,"message":"Subject not found."}

Don’t be thrown off by this misleading error. When no subject level compatibility config exists, it will show this error.

Update subject compatibility setting.

curl -s -X PUT -H "Content-Type: application/json" http://schema-registry-2:8081/config/bar-value --data '{"compatibility": "NONE"}'
{"compatibility":"NONE"}

If you now try to do the GET, you should be able to see the current applicable compatibility setting.

curl -s -X GET http://schema-registry-2:8081/config/bar-value
{"compatibilityLevel":"NONE"}

With this, the replicator should be able to proceed.

Please note, setting NONE compatibility settings defeat the purpose data governance at first place. So we should be very careful in setting "NONE" because this essentially disables compatibility checks that schema registry is doing for us, and its not safe to use in production environments.

Conclusion:

By using the "topic.rename.format" property of the replicator connector, a new subject would be created automatically and will inject the new schema which replicator read from the source (while de-serializing). In my opinion, this would be the safest option than others to choose from.

Replicate Avro Messages To Target— Part1 (Conflicting Schema Exists On Target)


Related Articles