Versions:
Scala Version: 2.12.9, monix/eval: 2.3.2, monix/reactive: 2.3.2
Backdrop:
I have two services serviceA and serviceB to call from a Monix observable. The program is as follows.
import monix.reactive.subjects.PublishToOneSubject
import monix.eval.Task
import monix.execution.Scheduler
import monix.execution.Ack.Continue
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSS");
def timestamp = formatter.format(LocalDateTime.now)
def log = s"[$timestamp] [${Thread.currentThread.getName}]"
println(s"$log Program Starts.")
private def serviceA(num: Int) = Task {
println(s"$log A: ${num}")
Thread.sleep(1000)
num * 10
}
private def serviceB(num: Int) = Task {
println(s"$log B: ${num}")
Thread.sleep(5 * 1000)
num * 0.5
}
private val aCtx = Scheduler.fixedPool("aCtx", 2)
private val bCtx = Scheduler.fixedPool("bCtx", 2)
val stream: PublishToOneSubject[Int] = PublishToOneSubject[Int]
stream
.doOnSubscribe( () => println(s"$log Stream is ready!"))
.doOnNext {
v => println(s"$log Event: ${v}")
}
.mapAsync(parallelism=3) {
v =>
serviceA(v)
.zip(serviceB(v).executeOn(bCtx))
.map(_._1)
}
.subscribe(
nextFn = { v =>
println(s"$log Result: ${v}")
Continue
}
)(aCtx)
(1 to 10).foreach {
i => stream.onNext(i)
}
Thread.sleep(30 * 1000)
println(s"$log Program Ends.")
Question:
How to ensure that any latency in calling serviceB doesn't impact the stream and it continues to process events with serviceA and return results?
Though I need to call serviceB, I am only interested in result of serviceA. I don't want any slowness in calling serviceB to impact ServiceA. Using zip delays the result if there is a latency in serviceB call.