top of page

Ultimate Runbook to Manage Apache Kafka

If you are not running Confluent Platform 5 or higher, you may have to do day to day configuration settings on the command line and believe even after using it for over 3–4 years, I keep forgetting the commands especially for a secure environment with annoying KRB environment variables.

Kafka Quickstart guide does a pretty good job with instructions to doing basic operations but when it comes to a real-world enterprise application that does not always help you.

The biggest pain point is getting it working with Kerberos security, pretty much different distribution of Kafka uses different environment variables which could be a bit annoying when you have to do a last-minute quick fix on the Production box and you can’t find the right command.

Here is the master Cheat Sheet ( It will be keep growing )

Let’s do step by step

Creating Topics

$KAFKA_BIN/ --create --zookeeper --partitions 5 --replication-factor 5 --topic $i

List All topics

$KAFKA_BIN/ --list --zookeeper

Alter Topic configurations ( Change retention period )

$KAFKA_BIN/ --zookeeper --entity-type topics --alter --add-config --entity-name topic_name

Change Commit offsets ( Krb environment variable seems different for this )

The aforesaid command uses KAFKA_CLIENT_KERBEROS_PARAMS but this one uses KAFKA_OPTS

>export KAFKA_OPTS=""
 >$KAFKA_BIN/ --bootstrap-server --group bcrGroupID --topic --reset-offsets --to-datetime 2020-01-01T00:00:00.00  --execute --command-config

Deleting Topics

bin/ --delete --zookeeper  --topic

However, sometimes due to inconsistencies the simple delete command does not clean up the zookeeper content. In such a situation you have to manually delete the zookeeper entries.

Manually delete Kafka topics from Zookeeper ${zookeeper_servers} rmr /brokers/topics/${kafka_topic}bin/ ${zookeeper_servers} rmr /config/topics/${kafka_topic}bin/ ${zookeeper_servers} rmr /admin/delete_topics/${kafka_topic}

Mirror two topics

If you are not using Kafka mirror maker or for an odd reason, it does not work, you can use kafkacat to get your job done.

kafkacat -C -b $BOOTSTRAP_SERVERS -o beginning -e -t $SOURCE_TOPIC  | kafkacat -P -b $BOOTSTRAP_SERVERS  -t $TARGET_TOPIC

Secure Consumer/Producer

Consuming from the command line is pretty easy, however, if you are using a schema registry and want to serialize and de-serialize Avro messages then, the command line consumer may not be good enough, you need to write code. Here is an example of a Python code.

from confluent_kafka import KafkaErrorfrom confluent_kafka.avro import AvroConsumerfrom confluent_kafka.avro.serializer import SerializerErrorfrom confluent_kafka.cimpl import TopicPartitionimport os,uuid

topic = ''bootstrap = ''registry = ''keytab_path = os.path.join('/keytabs/', 'events.keytab')

os.environ['KRB5_CONFIG'] = '/etc/krb5.conf'c = AvroConsumer({    'bootstrap.servers': bootstrap,    'schema.registry.url': registry,    '': str(uuid.uuid4()),    'default.topic.config': {        'auto.offset.reset': 'earliest'    },    'sasl.kerberos.keytab': keytab_path,    'security.protocol': "SASL_PLAINTEXT", # gives error :(    'sasl.kerberos.principal': "events@BCR.COM",

    '': 'kafka',    'security.protocol': 'SASL_PLAINTEXT',    'sasl.kerberos.kinit.cmd': 'kinit -kt "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}',   })

c.subscribe([topic])running = Truewhile running:    msg = None    try:        msg = c.poll(10)        if msg:            if not msg.error():                print(msg.value())                print(msg.key())                print(msg.partition())                print(msg.offset())                c.commit(msg)            elif msg.error().code() != KafkaError._PARTITION_EOF:                print(msg.error())                running = False        else:            print("No Message!! Happily trying again!!")    except SerializerError as e:        print("Message deserialization failed for %s: %s" % (msg, e))        running = Falsec.commit()


8 views0 comments


bottom of page