Implement Reactive Kafka Listener in Spring Boot application

8.9k views Asked by At

I'm trying to implement reactive kafka consumer in my Spring boot application and I'm looking at these examples: https://github.com/reactor/reactor-kafka/blob/master/reactor-kafka-samples/src/main/java/reactor/kafka/samples/SampleScenarios.java

and it looks like there is no support for Spring in reactive kafka yet

I understand how kafka listeners work in non-reactive kafka API in Spring: simplest solution is to configure beans for ConcurrentKafkaListenerContainerFactory and ConsumerFactory, then use @KafkaListener annotation and voila

But I'm not sure how to properly use reactive kafka in Spring right now.

Basically I need a listener for topic. Should I create some kind of loop or scheduler of my own? Or maybe I'm missing something. Can anyone share their knowledge and best practices?

1

There are 1 answers

2
Pawel On

I don't have a ready solution yet but i'm trying this (Kotlin code, Spring Boot). Someone published part of this code snippet here https://github.com/reactor/reactor-kafka/issues/100

@EventListener(ApplicationStartedEvent::class)
fun onSomeEvent() {
    kafkaReceiver
        .receive()
        .doOnNext { record ->
            val myEvent = record.value()
            processMyEvent(myEvent).thenEmpty {
                record.receiverOffset().acknowledge()
            }
        }
        .doOnError {
            /* todo */
        }
        .subscribe()
}

Look into other stack overflow questions. There is not much there, but maybe will give you some ideas