doseq with KafkaStream evaluates previous item

97 views Asked by At

I want to pop messages from a KafkaStream and process them. As KafkaStream implements java.lang.Iterable I thought I could just put the stream into a doseq and it would consume the messages as they come in. However what happens is that a message is only passed to the body of the doseq when another message is added to the KafkaStream, 'pushing' it out.

(doseq [msg (map unpack-message ^KafkaStream stream)]
    (handler msg))

What I have to do is implement what is done in https://github.com/pingles/clj-kafka/blob/master/src/clj_kafka/consumer/zk.clj lazy-iterate and build up a seq using the iterator. This works but I have no idea why, could someone explain?

One other even more weird thing is that if I put a log line in the unpack-message function I see that it is evaluated when the message comes in. So the message is unpacked in a lazy function map but it is not taken from the doseq event though it is kind of sitting in there waiting. I really don't understand.

0

There are 0 answers