Kafka Producer: how code onSuccess/onError callbacks with Reactive and Non-Blocking Producer

354 views Asked by At

I am trying following Micronaut Kafka guide. It shows this piece of code:

Single<Book> sendBook(
    @KafkaKey String author,
    Single<Book> book
);

I tried implement whithout success that way

Producer

package com.tolearn.producer

import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
import io.reactivex.Single

@KafkaClient(
        id = "demo-producer",
        acks = KafkaClient.Acknowledge.ALL)
        @Header(name = "X-Token", value = "\${my.application.token}")
public interface DemoProducer {

    //Reactive and Non-Blocking
    @Topic("demotopic")
    fun sendDemoMsgReactive(
            @KafkaKey key: String?,
            msg: Single<String>): Single<String?>?
}

And call it from service layer by

package com.tolearn.service

import com.tolearn.producer.DemoProducer
import io.reactivex.Single
import io.reactivex.SingleOnSubscribe
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton

@Singleton
class DemoService {
    @Inject
    @Named("dp")
    lateinit var dp : DemoProducer

    fun postDemo(key: String, msg: String){

        //Reactive and No-blocking
        val singleReturned:Single<String> = dp.sendDemoMsgReactive(key, SingleOnSubscribe<String> msg ).subscribe()

        singleReturned.doOnSuccess{
            print("ok")
        }
        singleReturned.doOnError ((e)->print(e))
    }
}

Basically, what I want is post a message to kafka "no-blocking" style using io.reactivex.Single. I understand I must subscribe and then code two callbacks:onSuccess and onError. Certainly I am missing some basic concepts regard ReactiveX. Kindly, any clue will be appreciatted.

0

There are 0 answers