I am trying following Micronaut Kafka guide. It shows this piece of code:
Single<Book> sendBook(
@KafkaKey String author,
Single<Book> book
);
I tried implement whithout success that way
Producer
package com.tolearn.producer
import io.micronaut.configuration.kafka.annotation.KafkaClient
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Header
import io.reactivex.Single
@KafkaClient(
id = "demo-producer",
acks = KafkaClient.Acknowledge.ALL)
@Header(name = "X-Token", value = "\${my.application.token}")
public interface DemoProducer {
//Reactive and Non-Blocking
@Topic("demotopic")
fun sendDemoMsgReactive(
@KafkaKey key: String?,
msg: Single<String>): Single<String?>?
}
And call it from service layer by
package com.tolearn.service
import com.tolearn.producer.DemoProducer
import io.reactivex.Single
import io.reactivex.SingleOnSubscribe
import javax.inject.Inject
import javax.inject.Named
import javax.inject.Singleton
@Singleton
class DemoService {
@Inject
@Named("dp")
lateinit var dp : DemoProducer
fun postDemo(key: String, msg: String){
//Reactive and No-blocking
val singleReturned:Single<String> = dp.sendDemoMsgReactive(key, SingleOnSubscribe<String> msg ).subscribe()
singleReturned.doOnSuccess{
print("ok")
}
singleReturned.doOnError ((e)->print(e))
}
}
Basically, what I want is post a message to kafka "no-blocking" style using io.reactivex.Single. I understand I must subscribe and then code two callbacks:onSuccess and onError. Certainly I am missing some basic concepts regard ReactiveX. Kindly, any clue will be appreciatted.