I'm playing with Monix streams and got the example where I build Observable
from Iterator
. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:
val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20
lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}
Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync
The case is quite simple. There is Iterator
that prints log every time it produces a next
value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17
As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered
takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.
Also why the initial values are produced in main
thread while subsequent ones are invoked on execution-context
thread pool? Shouldn't all be using scheduler that is used to run entire stream?
The low-level communication protocol is designed around a
Subscriber
and its (inherited) method onNext with the following signature:(source)
If we picture creation and transformation each being a stage, the source observable (
fromIterator
in your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.So what happens is:
fromIterator
stage generates value 1mapAsyncUnordered
stage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediatelyContinue
fromIterator
stage generates value 6 (this is when you see the output)mapAsyncUnordered
stage. This time, it cannot be accepted immediately, so the acknowledgement is toContinue
some time later. Until that, no more values are generated byfromIterator
.What's to note is that it's not
mapAsyncUnordered
stage that pulls the value out offromIterator
, but thatfromIterator
generates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like
executeAsync
,executeOn
, etc., you won't be able to tell whether operation will execute on same thread or not.