I am trying to read messages from Kafka, so I have written simple consumer to read messages from Kafka.
While True:
message = consumer.poll(timeout=1.0)
# i am doing something with messages
in above code output of message type is message object. how can I get as an array of messages?
is there any possibility ??
Note: Not much consumer configuration basic only.
librdkafka (the underlying C library) only return messages one by one to the application, but internally, messages are fetched by batch from brokers, so there is no performance downside. Messages are queued in an internal buffer, waiting your app to poll.
There are configurations to tune the behaviour:
fetch.wait.max.ms
(default 100), the time given to broker to accumulate data to sendfetch.message.max.bytes
(default 1048576, 1GB), the maximum size of batchesqueued.max.messages.kbytes
(default 1000000), the maximum size of data in internal queue. If you don't poll regularly, data won't be purged from queue and you won't be able to fetch any more data.And many others you can found here: https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md
If you really want an array of data because of your way to process data, what you can do is call poll with low timeout in a loop like you do, and stop your loop when you have x messages or after y ms, accumulating them in a collection. Process the generated array and repeat the loop.
The same goes for producing: you produce data one by one, but messages are batched before being sent to brokers.