I have Kafka 0.8.1.2.2 running on a HDP 2.2.4 cluster (3 brokers on 3 ZK nodes - ZK 3.4.6.2.2). All worked well for a couple of days and now my topics seem to have become unreachable by producers and consumer. I am newer to Kafka and am seeking a way to determine what has gone wrong and how to fix it as "just re-installing" will not be an option once in production.
Previously, messages were successfully received by my topics and could then be consumed. Now, even the most basic of operations fails immediately. If I ssh to a broker node and create a new topic:
[root@dev-hdp-0 kafka]# bin/kafka-topics.sh --create --zookeeper 10.0.0.39:2181 --replication-factor 3 --partitions 3 --topic test4
Created topic "test4".
So far so good. Now, we check the description:
[root@dev-hdp-0 kafka]# bin/kafka-topics.sh --describe --zookeeper 10.0.0.39:2181 --topic test4
Topic:test4 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test4 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2
Topic: test4 Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: test4 Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
OK - now if I create a consumer:
[2015-06-09 08:34:27,458] WARN [console-consumer-45097_dev-hdp-0.cloud.stp-1.sparfu.com-1433856803464-12b54195-leader-finder-thread], Failed to add leader for partitions [test4,0],[test4,2],[test4,1]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
java.net.ConnectException: Connection timed out
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:465)
at sun.nio.ch.Net.connect(Net.java:457)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
at kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:142)
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:124)
at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:157)
at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179)
at kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174)
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
at kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.Map$Map3.foreach(Map.scala:154)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
[2015-06-09 08:35:30,709] WARN [console-consumer-45097_dev-hdp-0.cloud.stp-1.sparfu.com-1433856803464-12b54195-leader-finder-thread], Failed to add leader for partitions [test4,0],[test4,2],[test4,1]; will retry (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread)
I have been poking around for something related to Failed to add leader for partitions
as that seems to be key, but I have yet to find anything specific there that is helpful.
So, if I try using the simple consumer shell for a known partition:
[root@dev-hdp-0 kafka]# bin/kafka-simple-consumer-shell.sh --broker-list 10.0.0.39:6667,10.0.0.45:6667,10.0.0.48:6667 --skip-message-on-error --offset -1 --print-offsets --topic test4 --partition 0
Error: partition 0 does not exist for topic test4
Despite the --describe
operation showing clearly that partition 0
very much does exist.
I have a simple Spark application that publishes a small number of messages to a topic, but this too fails to publish (on brand new and older, previously working, topics). A excerpt from the console here also alludes to leader
issues:
15/06/08 15:05:35 WARN BrokerPartitionInfo: Error while fetching metadata [{TopicMetadata for topic test8 ->
No partition metadata for topic test8 due to kafka.common.LeaderNotAvailableException}] for topic [test8]: class kafka.common.LeaderNotAvailableException
15/06/08 15:05:35 ERROR DefaultEventHandler: Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: test8
15/06/08 15:05:35 WARN BrokerPartitionInfo: Error while fetching metadata [{TopicMetadata for topic test8 ->
No partition metadata for topic test8 due to kafka.common.LeaderNotAvailableException}] for topic [test8]: class kafka.common.LeaderNotAvailableException
Additionally, if we try the console producer:
[root@dev-hdp-0 kafka]# bin/kafka-console-producer.sh --broker-list 10.0.0.39:6667,10.0.0.45:6667,10.0.0.48:6667 --topic test4
foo
[2015-06-09 08:58:36,456] WARN Error while fetching metadata [{TopicMetadata for topic test4 ->
No partition metadata for topic test4 due to kafka.common.LeaderNotAvailableException}] for topic [test4]: class kafka.common.LeaderNotAvailableException (kafka.producer.BrokerPartitionInfo)
I have scanned the logs under /var/log/kafka and nothing any more descriptive than the console output presents itself. Searching on the various exceptions has yielded little more than others with similar mysterious issues.
That all said is there a way to diagnose properly why my broker set suddenly stopped working when there has been no changes to the environment or configs? Has anyone encounter a similar scenario and found a corrective set of actions?
Some other details: All nodes are CentOS 6.6 on an OpenStack private cloud HDP Cluster 2.2.4.2-2 installed and configured using Ambari 2.0.0 Kafka service has been restarted (a few times now...)
Not sure what else might be helpful - let me know if there are other details that could help to shed light on the problem.
Thank you.
Looks like forcibly stopping (kill -9) and restarting kafka did the trick.
Graceful shutdown didn't work.
Looking at the boot scripts, kafka and zookeeper where coming up at the same time (S20kafka, S20zookeeper) - so perhaps that was the initial problem. For now... not going to reboot this thing.