ConfluentKafka in Python: Using consumer to consumer set of records

443 views Asked by At

I have started to learn confluent kafka(python). There is 1 producer, 1 topic, 1 partition and 1 consumer(a simple setup). My requirement is that I wish to fetch data collectively. I read that using poll(some_time) will wait for the required time and fetch records in batches/list. I thought it would be a simple iteration process, something like:

msgs = consumer.poll(1000) for msg in msgs: do some action..........

Issue is I can't iterate over this 'msgs' object - I scanned to documentation to realize poll returns a single message - so is there no way to get a list of messages?(A workaround would be to frequently fetch in subsets of that required time, then process data collectively - but was hoping there was another way). The poll() iteration process appears to be different from kafka-python's way.

2

There are 2 answers

3
Sara M. On BEST ANSWER

The poll method in python doesn't inherently support batch processing, so as you mentioned: "a workaround would be to frequently fetch in subsets of that required time, then process data collectively"

See this article, it can help you

2
OneCricketeer On

You need a while loop. It's written in the documentation and the runnable examples in the github repo...

Confluent library is implemented over a C/C++ client, not natively implemented in Python like kafka-python or aiokafka, therefore won't directly have a native Python iterable object to use. The Kafka consumer protocol does do pre-fetching though, so you're actually polling from a local queue, not actual making one network call for an offset at a time. See queued.min.messages - https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

If you'd like this iterator feature available, search the github issues. I'm sure someone would want the same thing.