Ultimate Runbook to Manage Apache Kafka

 

if you are not running Confluent Platform 5 or higher, you may have to do day to day configuration settings on the command line and believe even after using it for over 3-4 years, I keep forgetting the commands especially for a secure environment with annoying KRB environment variables.

Kafka Quickstart guide does a pretty good job with instructions to doing basic operations but when it comes to a real-world enterprise application that does not always help you.

The biggest pain point is getting it working with Kerberos security, pretty much different distribution of Kafka uses different environment variables which could be a bit annoying when you have to do a last-minute quick fix on the Production box and you can’t find the right command.

Here is the master Cheat Sheet ( It will be keep growing )

Let’s do step by step

Creating Topics

export KAFKA_CLIENT_KERBEROS_PARAMS="-Djava.security.auth.login.config=client_jaas.conf"

$KAFKA_BIN/kafka-topics.sh --create --zookeeper zk.becloudready.com:2181 --partitions 5 --replication-factor 5 --topic $i

List All topics

$KAFKA_BIN/kafka-topics.sh --list --zookeeper zk.becloudready.com:2181

Alter Topic configurations ( Change retention period )

export KAFKA_CLIENT_KERBEROS_PARAMS="-Djava.security.auth.login.config=client_jaas.conf"

$KAFKA_BIN/kafka-configs.sh --zookeeper zk.becloudready.com:2181 --entity-type topics --alter --add-config retention.ms=100 --entity-name topic_name

Change Commit offsets ( Krb environment variable seems different for this )

The aforesaid command uses KAFKA_CLIENT_KERBEROS_PARAMS but this one uses KAFKA_OPTS

>export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"

 
>$KAFKA_BIN/kafka-consumer-groups.sh --bootstrap-server kafka.becloudready.com:9096 --group bcrGroupID --topic bcr.cloud.events --reset-offsets --to-datetime 2020-01-01T00:00:00.00  --execute --command-config server.properties

>cat server.properties
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

Deleting Topics

bin/kafka-topics.sh --delete --zookeeper  kafka.becloudready.com:2181  --topic bcr.cloud.event

However, sometimes due to inconsistencies the simple delete command does not clean up the zookeeper content. In such a situation you have to manually delete the zookeeper entries.

Manually delete Kafka topics from Zookeeper

zookeeper_servers=zk.becloudready.com:2181
kafka_topic=bcr.cloud.events
bin/zookeeper-shell.sh ${zookeeper_servers} rmr /brokers/topics/${kafka_topic}
bin/zookeeper-shell.sh ${zookeeper_servers} rmr /config/topics/${kafka_topic}
bin/zookeeper-shell.sh ${zookeeper_servers} rmr /admin/delete_topics/${kafka_topic}

Mirror two topics

If you are not using Kafka mirror maker or for an odd reason, it does not work, you can use kafkacat to get your job done.

kafkacat -C -b $BOOTSTRAP_SERVERS -o beginning -e -t $SOURCE_TOPIC  | kafkacat -P -b $BOOTSTRAP_SERVERS  -t $TARGET_TOPIC

Secure Consumer/Producer

Consuming from the command line is pretty easy, however, if you are using a schema registry and want to serialize and de-serialize Avro messages then, the command line consumer may not be good enough, you need to write code. Here is an example of a Python code.

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
from confluent_kafka.cimpl import TopicPartition
import os,uuid


topic = 'bcr.cloud.events'
bootstrap = 'kafka.becloudready.com:9096'
registry = 'kafka.becloudready.com:8081'
keytab_path = os.path.join('/keytabs/', 'events.keytab')


os.environ['KRB5_CONFIG'] = '/etc/krb5.conf'
c = AvroConsumer({
    'bootstrap.servers': bootstrap,
    'schema.registry.url': registry,
    'group.id': str(uuid.uuid4()),
    'default.topic.config': {
        'auto.offset.reset': 'earliest'
    },
    'sasl.kerberos.keytab': keytab_path,
    'security.protocol': "SASL_PLAINTEXT", # gives error :(
    'sasl.kerberos.principal': "events@BCR.COM",


    'sasl.kerberos.service.name': 'kafka',
    'security.protocol': 'SASL_PLAINTEXT',
    'sasl.kerberos.kinit.cmd': 'kinit -kt "%{sasl.kerberos.keytab}" %{sasl.kerberos.principal}',
   
})


c.subscribe([topic])
running = True
while running:
    msg = None
    try:
        msg = c.poll(10)
        if msg:
            if not msg.error():
                print(msg.value())
                print(msg.key())
                print(msg.partition())
                print(msg.offset())
                c.commit(msg)
            elif msg.error().code() != KafkaError._PARTITION_EOF:
                print(msg.error())
                running = False
        else:
            print("No Message!! Happily trying again!!")
    except SerializerError as e:
        print("Message deserialization failed for %s: %s" % (msg, e))
        running = False
c.commit()
c.close()