Properly Seek and Consume Kafka Messages on Multipartition Topic

2.4k views Asked by At

I recently found that a topic i've been using is multi-partition rather than single partition. I need to reconfigure my consumer class to handle the multiple partitions, but i'm a touch confused. I am currently using an offset group, let's call it test_offset_group for sake of the below example. Under normal circumstances, I will always be parsing linearly and continuing forward in time; as messages get added to the topic I will parse them and move on, but in the event of a crash or the need to go back and re-run the feed for the previous day, I need to be able to seek by timestamp. Kafka is mandatory in this project so I have no ability to change the type of streaming data service i'm using.

I configure my consumer like this:

test_consumer = KafkaConsumer("test_topic", bootstrap_servers="bootstrap_string", enable_auto_commit=False, group_id="test_offset_group"

In the event I need to seek to a timestamp, i'll provide a timestamp and then seek using the following method:

test_consumer.poll()

tp = TopicPartition("test_topic", 0)

needed_date = datetime.timestamp(timestamp)

rec_in = test_consumer.offsets_for_times({tp: needed_date * 1000})

test_consumer.seek(tp, rec_in[tp].offset)

The above functions perfectly for a single partition consumer, but this feels very clunky and difficult when you consider numerous partitions. I guess I could fetch the number of partitions using test_consumer.partitions_for_topic('test_topic") and then iterate over each of them, but again, that seems like i'm going against the grain of Kafka and I feel like there should be an easier way to do this.

In summary: I'd like to understand how to seek to a number of offsets with multiple partitions utilizing the offset_group functionality and i'd like to confirm that, by conducting the above operation, I am effectively ignoring all partitions aside from 0?

1

There are 1 answers

0
Mickael Maison On

You are doing the right logic, you just need to perform it on all partitions asigned to this consumer instance.

You can retrieve the current assignment using assignment().