Content

Saturday, November 11, 2017

How to read an Apache Storm 1.0.2 Kafka Spout Zookeeper offset?

As with any streaming systems, the consumers namely the Kafka Spout need to keep track up to which message identifier has been read in Kafka topic. The reason this needs to be persisted is if we do need to restart the topology and the spout restarts, it needs to know from where it needs to start reading from else it would start from the beginning. In order to prevent this from happening Storm Kafka spout allows once to persist the offset in Zookeeper and would automatically read it when the Topology restarts.

In order to find this information,  we need to login to the Zookeeper Command Line Shell.
cd /usr/local/zookeeper/zookeeper-3.4.9

bin/zkCli.sh -server zookeeper1:2181

Check the topic and its partition for the consumer, your need to type in your topicname and if you have more than 1 partition make sure you do this to every partition

get /consumers/yourcompany/yourtopicname/partition_0

Sample response

{"topology":{"id":"YourTopologyInstanceId-1-1497152721","name":"YourTopologyName"},"offset":3673,"partition":0,"broker":{"host":"81387110753b","port":9092},"topic":"yourTopicName"}

The offset:3673 says up to which offset the Kafka spout has read from this topic's partition 0.


Configure how long a message is kept in Kafka 0.10. 1.0 topic?

Sometime, due to disk size issues or just the fact that we don't want old messages in Kafka, one might need to clear these messages in Kafka topic automatically.

Please note, as of the current writing Kafka is not clever enough to clear messages once they are read by any one consumer. This needs to be done manually by the User. So do make sure that you have a setting that fits your purpose. i.e your consumers should be able to read it before the message is deleted from Kafka. Once a message is deleted from Kafka there is no way to get it back.

The current default for Kafka messages in a Topic is 7 Days.

Here we are changing this to 1 day.

cd /usr/local/kafka/kafka_2.11-0.10.1.0/bin

./kafka-topics.sh --zookeeper zookeeper1:2181 --alter --topic yourtopicname --config retention.ms=86400000

How to clear all messages in Kafka 0.10.1.0 topic without deleting the topic?

Sometime, the messages in Kafka topic would be overwhelming and we need a quick way to clear these messages without deleting the topic.

Note deleting the topic is an option that should be used with caution in Production.

Kafka topics by default have a concept of retention, i.e how long a message in a topic needs to be persisted. By default this setting is 7 days. We are going to make use of this to clear the messages in the topic.

We are going to set the retention to 1 second and then bring it back to 7 days. Login to the Kafka directory and go to the folder location where Kafka is installed.


cd /usr/local/kafka/kafka_2.11-0.10.1.0/bin

./kafka-topics.sh --zookeeper zookeeper1:2181 --alter --topic yourtopicname --config retention.ms=1000


Wait for some time so that the messages are cleared. Check if there are any messages by reading from the topic from the beginning

 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yourtopicname --from-beginning 

Make sure that you reset the timing to the original 7 days

Important Please do not forget to do this step, else all your messages in Kafka would be deleted after 1 second delay.

cd /usr/local/kafka/kafka_2.11-0.10.1.0/bin

./kafka-topics.sh --zookeeper zookeeper1:2181 --alter --topic yourtopicname --config retention.ms=604800000