top of page

Apache Kafka Schema Registry Hacks

Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Apache Avro® schemas. It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibilities settings and allows the evolution of schemas according to the configured compatibility settings and expanded Avro support. It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.

The schema Topic ( Single Point of failure )

Even though you may have multiple schema registries to achieve high availability, but it could still have some single point of failure as all the schema registries point to topic called ( _schemas ). This is a special topic with compact enabled, which means it will delete the duplicate entries also, the message here will never expire.

In some cases, at least what I have seen, this topic gets corrupt and your schema registry effectively becomes useless which in turn your whole cluster into a useless cluster, where messages can be consumed but can’t be deserialized as all metadata ( Schema IDs ) are lost.

There are multiple ways to avoid such mishaps, one simple and easy way would be to take a periodic backup of the schema topic into a separate disk and have a script to recover the topic in case of corruption.

Failure case 1: You cant publish the schemas but you can read from them.

This is a classic case of topic corruption, where you are not able to publish schema ( write to Kafka topic (_schemas ), but you are able to read the schemas and is able to publish/consume messages in Kafka as well. The good news is that all schemas are already in the memory of JVM of schema registry, so you can simply pull the data from the rest API and republish them into newly created Kafka topic.

Create a Kafka topic with compact enable

$KAFKA_BIN/ --create --topic _schemas --zookeeper --config cleanup.policy=compact

Pull all the schemas from the running schema registry and publish them back into Kafka topic

producer_list = [''] url = ""print('Starting Schema recovery')p = Producer({'bootstrap.servers': producer_list[0]})response = requests.get(url)subjects = json.loads(response.text)for subject in subjects:    versions = "{}/{}/versions".format(url,subject)    r = requests.get(versions)    j = json.loads(r.text)    for version in j:        schema_url = "{}/{}/versions/{}".format(url,subject,version)        r = requests.get(schema_url)        skey = {"subject":subject,"version":version,"magic":1,"keytype":"SCHEMA"}        p.produce(topic, key=json.dumps(skey).encode('utf-8'), value=r.text.encode('utf-8'))        p.flush()

Restart the Schema registry

Failure case 2: Recover the topic and spin up a new instance

The rare situation when that happens, you have no choice but to simply copy all the remaining good messages from the topic to a newly created topic. As of yet, Kafka does not allow renaming topics, so you will have to

  • Backup the data into another topic

def write_schema(skey,value):    c.subscribe(["_schemas"])while True:    try:        j = json.loads(msg.key())        if j['keytype'] != 'DELETE_SUBJECT':            p.produce("backuptopic_topic", key=skey, value=value)
  • Re-create the _schemas topic

  • Publish all backed up messages into the newly created topic

  • Start schema registry

To avoid future failures

  1. Take backup from the Schema registry RestAPI to secondary storage or into git

  2. Take a backup of the _schemas topic itself

  3. Remember, when running a production system, you need to have backup of the backup


3 views0 comments


bottom of page