Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving Apache Avro® schemas. It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibilities settings and allows the evolution of schemas according to the configured compatibility settings and expanded Avro support. It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in the Avro format.
The schema Topic ( Single Point of failure )
Even though you may have multiple schema registries to achieve high availability, but it could still have some single point of failure as all the schema registries point to topic called ( _schemas ). This is a special topic with compact enabled, which means it will delete the duplicate entries also, the message here will never expire.
In some cases, at least what I have seen, this topic gets corrupt and your schema registry effectively becomes useless which in turn your whole cluster into a useless cluster, where messages can be consumed but can’t be deserialized as all metadata ( Schema IDs ) are lost.
There are multiple ways to avoid such mishaps, one simple and easy way would be to take a periodic backup of the schema topic into a separate disk and have a script to recover the topic in case of corruption.
Failure case 1: You cant publish the schemas but you can read from them.
This is a classic case of topic corruption, where you are not able to publish schema ( write to Kafka topic (_schemas ), but you are able to read the schemas and is able to publish/consume messages in Kafka as well. The good news is that all schemas are already in the memory of JVM of schema registry, so you can simply pull the data from the rest API and republish them into newly created Kafka topic.
Create a Kafka topic with compact enable
$KAFKA_BIN/kafka-topics.sh --create --topic _schemas --zookeeper zk.becloudready.com:2181 --config cleanup.policy=compact
Pull all the schemas from the running schema registry and publish them back into Kafka topic
producer_list = ['kafka.becloudready.com:9096'] url = "http://kafka.becloudready.com:8081/subjects"print('Starting Schema recovery')p = Producer({'bootstrap.servers': producer_list[0]})response = requests.get(url)subjects = json.loads(response.text)for subject in subjects: versions = "{}/{}/versions".format(url,subject) r = requests.get(versions) j = json.loads(r.text) for version in j: schema_url = "{}/{}/versions/{}".format(url,subject,version) r = requests.get(schema_url) skey = {"subject":subject,"version":version,"magic":1,"keytype":"SCHEMA"} p.produce(topic, key=json.dumps(skey).encode('utf-8'), value=r.text.encode('utf-8')) p.flush()
Restart the Schema registry
Failure case 2: Recover the topic and spin up a new instance
The rare situation when that happens, you have no choice but to simply copy all the remaining good messages from the topic to a newly created topic. As of yet, Kafka does not allow renaming topics, so you will have to
Backup the data into another topic
def write_schema(skey,value): c.subscribe(["_schemas"])while True: try: j = json.loads(msg.key()) if j['keytype'] != 'DELETE_SUBJECT': p.produce("backuptopic_topic", key=skey, value=value)
Re-create the _schemas topic
Publish all backed up messages into the newly created topic
Start schema registry
To avoid future failures
Take backup from the Schema registry RestAPI to secondary storage or into git
Take a backup of the _schemas topic itself
Remember, when running a production system, you need to have backup of the backup
.
Comments