How to read batch messages in confluent kafka python?

6.2k views Asked by At

I am trying to read messages from Kafka, so I have written simple consumer to read messages from Kafka.

While True:
        message = consumer.poll(timeout=1.0)
        # i am doing something with messages

in above code output of message type is message object. how can I get as an array of messages?

is there any possibility ??

Note: Not much consumer configuration basic only.

2

There are 2 answers

8
Treziac On BEST ANSWER

librdkafka (the underlying C library) only return messages one by one to the application, but internally, messages are fetched by batch from brokers, so there is no performance downside. Messages are queued in an internal buffer, waiting your app to poll.

There are configurations to tune the behaviour:

fetch.wait.max.ms (default 100), the time given to broker to accumulate data to send fetch.message.max.bytes (default 1048576, 1GB), the maximum size of batches queued.max.messages.kbytes (default 1000000), the maximum size of data in internal queue. If you don't poll regularly, data won't be purged from queue and you won't be able to fetch any more data.

And many others you can found here: https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


If you really want an array of data because of your way to process data, what you can do is call poll with low timeout in a loop like you do, and stop your loop when you have x messages or after y ms, accumulating them in a collection. Process the generated array and repeat the loop.

The same goes for producing: you produce data one by one, but messages are batched before being sent to brokers.

0
user192344 On

i think you can just use consumer.consume(batch_size, timeout=1)

   batch_size = 10
   while True:
   
    # Poll for new messages
    messages = consumer.consume(batch_size, timeout=1)

    if messages is None:
        continue

    if not messages:
        # No messages received
        continue

    for message in messages:
        if message.error():
            print(f"Error consuming message: {message.error()}")
        else:
            print(f"Received message: {message.value()}")