I want to pop messages from a KafkaStream
and process them. As KafkaStream
implements java.lang.Iterable
I thought I could just put the stream into a doseq
and it would consume the messages as they come in. However what happens is that a message is only passed to the body of the doseq
when another message is added to the KafkaStream
, 'pushing' it out.
(doseq [msg (map unpack-message ^KafkaStream stream)]
(handler msg))
What I have to do is implement what is done in https://github.com/pingles/clj-kafka/blob/master/src/clj_kafka/consumer/zk.clj lazy-iterate
and build up a seq using the iterator. This works but I have no idea why, could someone explain?
One other even more weird thing is that if I put a log line in the unpack-message function I see that it is evaluated when the message comes in. So the message is unpacked in a lazy function map
but it is not taken from the doseq
event though it is kind of sitting in there waiting. I really don't understand.