Reactive and Non-Blocking Method Micronaut with apache kafka

795 views Asked by At

I am trying to get the Non-Blocking response from the Micronaut kafka implementation, however the return value in not working.

public class ProductManager implements IProductManager{
    private final ApplicationContext applicationContext;
    
    public ProductManager(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    @Override
    public ProductViewModel findFreeText(String text) {
       
        final ProductViewModel model = new ProductViewModel();
        IProductProducer client = applicationContext.getBean(IProductProducer.class);
         client.findFreeText(text).subscribe(item -> {
          System.out.println(item);
        });
         return model;
    }
}

The subscribe method is not working, the debugger never comes to this point. I want to get the value back from the kafka listener

kafka producer

@KafkaClient
public interface IProductProducer {
    
    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
    Flowable<ProductViewModel> findFreeText(String text);
}

Kafka Listener

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
public class ProductListener {
    private static final Logger LOG = LoggerFactory.getLogger(ProductListener.class);

    @Topic(ProductTopicConstants.GET_FREE_TEXT_SEARCH)
    public Flowable<Product>> findByFreeText(String text) {
        LOG.info("Listening value = ", text);
        
        return Flowable.just(new Product("This is the test", 0,"This is test description"));
    }
}

Micronaut documentation for non-blocking method

https://docs.micronaut.io/1.0.0.M3/guide/index.html#_reactive_and_non_blocking_method_definitions

0

There are 0 answers