I am working on setting up a Kafka Connect Distributed Mode application which will be a Kafka to S3 pipeline. I am using Kafka 0.10.1.0-1 and Kafka Connect 3.1.1-1. So far things are going smoothly but one aspect that is important to the larger system I am working with requires knowing offset information of the Kafka -> FileSystem pipeline. According to the documentation, the offset.storage.topic
configuration will be the location the distributed mode application uses for storing offset information. This makes sense given how Kafka stores consumer offsets in the 'new' Kafka. However, after doing some testing with the FileStreamSinkConnector, nothing is being written to my offset.storage.topic
which is the default value: connect-offsets
.
To be specific, I am using a Python Kafka producer to push data to a topic and using Kafka Connect with the FileStreamSinkConnect to output the data from the topic to a file. This works and behaves as I expect the connector to behave. Additionally, when I stop the connector and start the connector, the application remembers the state in the topic and there is no data duplication. However, when I go to the offset.storage.topic
to see what offset metadata is stored, there is nothing in the topic.
This is the command that I use:
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning
I receive this message after letting this command run for a minute or so:
Processed a total of 0 messages
So to summarize, I have 2 questions:
- Why is offset metadata not being written to the topic that should be storing this even though my distributed application is keeping state correctly?
- How do I access offset metadata information for a Kafka Connect distributed mode application? This is 100% necessary for my team's Lambda Architecture implementation of our system.
Thanks for the help.
The new S3 Connector released by Confluent might be of interested to you.
From what you describe, maybe it can significantly simplify your goal of exporting records from Kafka to your S3 buckets.