Reactor Kafka with spring-boot 3 not closing a inner flux on flatMap

45 views Asked by At

Below is the sample code used to subscribe to kafka with project reactor. This code worked with spring boot 2.

With spring boot 3 below code is not working.

public  Flux<ReceiverRecord<String, String>> getKafkaReceiver(){
            //import reactor.kafka.receiver.KafkaReceiver;
            KafkaReceiver.create(receiverOptions(Collections.singletonList("mytopic"))
                        .addAssignListener(partitions -> partitions.forEach(ReceiverPartition::seekToBeginning))
                        .commitInterval(Duration.ZERO) // no auto commit
                        ).receive();
}

           Scheduler scheduler = Schedulers.newSingle("trans-store", true);
           AtomicInteger counter = new AtomicInteger();
           Flux<ReceiverRecord<String,String>> receiver = getKafkaReceiver()
                        .takeUntil(r -> counter.get() == getmessageCount()) //getmessagecount will provice the number of messages in the topic accross partition
                        .doOnNext(r -> { counter.getAndIncrement();log.info(" counter - {}",counter.get());})  //using lomobok for log
                        .groupBy(msg -> msg.receiverOffset().topicPartition())
                        .log()
                        .flatMap(innerFlux -> innerFlux
                                .publishOn(scheduler)
                                // inside the partition, take until message offset equals endOffset - 1
                                .takeUntil(m -> m.offset() >= getLatestOffsetForTheTopic().get(innerFlux.key())) // getLatestOffsetForTheTopic() method contains map of partition based offset
                                .doOnNext(message -> {
                                    log.info(">>>> partition offset {} >= counter {} ", message.offset(), getLatestOffsetForTheTopic().get(innerFlux.key()));
                                    // processing the message 
                                })
                                .doOnComplete(() -> log.info("done partition {}", innerFlux.key().partition()))
                        )
                        .doOnComplete(this::createBean)  // creating a bean 
                        .doFinally(s -> {scheduler.dispose();})
                        .doOnCancel(s -> {scheduler.dispose();});
                        
        receiver.subscribe();

I am subscribing to the kafka topic, from the beginning and trying to scan through the messages in different topic and start consuming messages from the very last offset on each topic. (I could have used the offset to seek end/last or earliest), but this was implemented for an use case.

The reactor-core - 3.5.10 and reactor-kafka - 1.3.21, after subscribed the outer doOnComplete() that creates the bean is not reached.

With previous version of reactor the second doOnComplete() got invoked (with the spring boot 2).

When troubleshooting the flow, noticed the flow waits after doOnComplete() of the innerFlux since the log message is getting printed. Not moving further.

One observation is the inner flux is started in a separate thread than the main flux, assuming that is blocking the main thread to move further.

Any idea, how to close the innerFlux and invoke the outer doOnComplete() that creates the bean in this case?

0

There are 0 answers