top of page
Writer's picturebeCloudReady Team

Ultimate Runbook to Manage Apache Kafka




If you are not running Confluent Platform 5 or higher, you may have to do day to day configuration settings on the command line and believe even after using it for over 3–4 years, I keep forgetting the commands especially for a secure environment with annoying KRB environment variables.

Kafka Quickstart guide does a pretty good job with instructions to doing basic operations but when it comes to a real-world enterprise application that does not always help you.

The biggest pain point is getting it working with Kerberos security, pretty much different distribution of Kafka uses different environment variables which could be a bit annoying when you have to do a last-minute quick fix on the Production box and you can’t find the right command.

Here is the master Cheat Sheet ( It will be keep growing )

Let’s do step by step

Creating Topics

export KAFKA_CLIENT_KERBEROS_PARAMS="-Djava.security.auth.login.config=client_jaas.conf"
$KAFKA_BIN/kafka-topics.sh --create --zookeeper zk.becloudready.com:2181 --partitions 5 --replication-factor 5 --topic $i

List All topics

$KAFKA_BIN/kafka-topics.sh --list --zookeeper zk.becloudready.com:2181

Alter Topic configurations ( Change retention period )

export KAFKA_CLIENT_KERBEROS_PARAMS="-Djava.security.auth.login.config=client_jaas.conf"
$KAFKA_BIN/kafka-configs.sh --zookeeper zk.becloudready.com:2181 --entity-type topics --alter --add-config retention.ms=100 --entity-name topic_name

Change Commit offsets ( Krb environment variable seems different for this )

The aforesaid command uses KAFKA_CLIENT_KERBEROS_PARAMS but this one uses KAFKA_OPTS

>export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
 >$KAFKA_BIN/kafka-consumer-groups.sh --bootstrap-server kafka.becloudready.com:9096 --group bcrGroupID --topic bcr.cloud.events --reset-offsets --to-datetime 2020-01-01T00:00:00.00  --execute --command-config server.properties
>cat server.propertiessecurity.protocol=SASL_PLAINTEXTsasl.kerberos.service.name=kafka

Deleting Topics

bin/kafka-topics.sh --delete --zookeeper  kafka.becloudready.com:2181  --topic bcr.cloud.event

However, sometimes due to inconsistencies the simple delete command does not clean up the zookeeper content. In such a situation you have to manually delete the zookeeper entries.

Manually delete Kafka topics from Zookeeper

zookeeper_servers=zk.becloudready.com:2181kafka_topic=bcr.cloud.eventsbin/zookeeper-shell.sh ${zookeeper_servers} rmr /brokers/topics/${kafka_topic}bin/zookeeper-shell.sh ${zookeeper_servers} rmr /config/topics/${kafka_topic}bin/zookeeper-shell.sh ${zookeeper_servers} rmr /admin/delete_topics/${kafka_topic}

Mirror two topics

If you are not using Kafka mirror maker or for an odd reason, it does not work, you can use kafkacat to get your job done.

kafkacat -C -b $BOOTSTRAP_SERVERS -o beginning -e -t $SOURCE_TOPIC  | kafkacat -P -b $BOOTSTRAP_SERVERS  -t $TARGET_TOPIC

Secure Consumer/Producer


Consuming from the command line is pretty easy, however, if you are using a schema registry and want to serialize and de-serialize Avro messages then, the command line consumer may not be good enough, you need to write code. Here is an example of a Python code.


from confluent_kafka import KafkaErrorfrom confluent_kafka.avro import AvroConsumerfrom confluent_kafka.avro.serializer import SerializerErrorfrom confluent_kafka.cimpl import TopicPartitionimport os,uuid

topic = 'bcr.cloud.events'bootstrap = 'kafka.becloudready.com:9096'registry = 'kafka.becloudready.com:8081'keytab_path = os.path.join('/keytabs/', 'events.keytab')

os.environ['KRB5_CONFIG'] = '/etc/krb5.conf'c = AvroConsumer({    'bootstrap.servers': bootstrap,    'schema.registry.url': registry,    'group.id': str(uuid.uuid4()),    'default.topic.config': {        'auto.offset.reset': 'earliest'    },    'sasl.kerberos.keytab': keytab_path,    'security.protocol': "SASL_PLAINTEXT", # gives error :(    'sasl.kerberos.principal': "events@BCR.COM",

    'sasl.kerberos.service.name': 'kafka',    'security.protocol': 'SASL_PLAINTEXT',    'sasl.kerberos.kinit.cmd': 'kinit -kt "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}',   })

c.subscribe([topic])running = Truewhile running:    msg = None    try:        msg = c.poll(10)        if msg:            if not msg.error():                print(msg.value())                print(msg.key())                print(msg.partition())                print(msg.offset())                c.commit(msg)            elif msg.error().code() != KafkaError._PARTITION_EOF:                print(msg.error())                running = False        else:            print("No Message!! Happily trying again!!")    except SerializerError as e:        print("Message deserialization failed for %s: %s" % (msg, e))        running = Falsec.commit()

c.close()

16 views0 comments

Comments


bottom of page