Kafka Connect offset.storage.topic not receiving messages (i.e. how to access Kafka Connect offset metadata?)

6.6k views Asked by At

I am working on setting up a Kafka Connect Distributed Mode application which will be a Kafka to S3 pipeline. I am using Kafka 0.10.1.0-1 and Kafka Connect 3.1.1-1. So far things are going smoothly but one aspect that is important to the larger system I am working with requires knowing offset information of the Kafka -> FileSystem pipeline. According to the documentation, the offset.storage.topic configuration will be the location the distributed mode application uses for storing offset information. This makes sense given how Kafka stores consumer offsets in the 'new' Kafka. However, after doing some testing with the FileStreamSinkConnector, nothing is being written to my offset.storage.topic which is the default value: connect-offsets.

To be specific, I am using a Python Kafka producer to push data to a topic and using Kafka Connect with the FileStreamSinkConnect to output the data from the topic to a file. This works and behaves as I expect the connector to behave. Additionally, when I stop the connector and start the connector, the application remembers the state in the topic and there is no data duplication. However, when I go to the offset.storage.topic to see what offset metadata is stored, there is nothing in the topic.

This is the command that I use:

kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning

I receive this message after letting this command run for a minute or so:

Processed a total of 0 messages

So to summarize, I have 2 questions:

  1. Why is offset metadata not being written to the topic that should be storing this even though my distributed application is keeping state correctly?
  2. How do I access offset metadata information for a Kafka Connect distributed mode application? This is 100% necessary for my team's Lambda Architecture implementation of our system.

Thanks for the help.

3

There are 3 answers

0
Konstantine Karantasis On

The new S3 Connector released by Confluent might be of interested to you.

From what you describe, maybe it can significantly simplify your goal of exporting records from Kafka to your S3 buckets.

0
Liju John On

The offsets might be committing to the kafka default offset commit topic i.e. _consumer_offsets

3
Gwen Shapira On
  1. Liju is correct, connect-offsets is used to track offsets for source connectors (which have a producer but not a consumer). Sink connector have a consumer and track offsets the usual way - __consumer_offsets topic

  2. The best way to look at last committed offsets is with the consumer group tool:

    bin/kafka-consumer-groups.sh --group connect-elastic-login-connector --bootstrap-server localhost:9092 --describe

The group name is always "connect-" and the connector name (in my case, elastic-login-connector). This will show the latest offset committed by the group, which basically acknowledges that all messages up to this offset were written to Elastic.