I'm playing with Monix streams and got the example where I build Observable from Iterator. It seems to me like when run it produces 1 more element than I'd expect. The following code shows that:
val count = AtomicLong(0)
def produceValue(): Long = {
count.transformAndGet { i =>
logger.info(s"Producing value: ${i + 1}")
i + 1
}
}
def more(): Boolean = count.get < 20
lazy val iter = new Iterator[Long] {
override def hasNext: Boolean = more()
override def next(): Long = produceValue()
}
Observable
.fromIterator(iter)
.mapParallelUnordered(5) { x =>
Task(x)
.foreachL { x =>
logger.info(s"Transforming $x")
}
.delayExecution(3.seconds)
}
.consumeWith(Consumer.complete)
.runAsync
The case is quite simple. There is Iterator that prints log every time it produces a next value. Downstream stage is simple delayed task run in parallel count of 5 to see what's happening. Now the output is as follows:
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 1
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 2
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 3
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 4
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 5
[INFO ] c.s.f.a.t.MonixSandbox$ [main] - Producing value: 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 4
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 3
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 5
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 2
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 1
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Producing value: 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-22] - Transforming 7
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 6
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-21] - Transforming 9
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 8
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 10
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Producing value: 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 11
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 13
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 12
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 14
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-26] - Transforming 15
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 17
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Producing value: 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-20] - Transforming 16
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-18] - Transforming 20
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-27] - Transforming 18
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-17] - Transforming 19
[INFO ] c.s.f.a.t.MonixSandbox$ [scala-execution-context-global-19] - Transforming 17
As you can see, initially the stream produces 6 elements while I'd expect 5 only (as the downstream stage mapParallelUnordered takes only 5 elements. Actually that's not a big deal, but I just want to understand why is it so.
Also why the initial values are produced in main thread while subsequent ones are invoked on execution-context thread pool? Shouldn't all be using scheduler that is used to run entire stream?
The low-level communication protocol is designed around a
Subscriberand its (inherited) method onNext with the following signature:(source)
If we picture creation and transformation each being a stage, the source observable (
fromIteratorin your case) pushes its value down to subscribers, and, when acknowledged, pushes the next one.So what happens is:
fromIteratorstage generates value 1mapAsyncUnorderedstage, where it is accepted (b/c there are free workers), so the acknowledgement is to immediatelyContinuefromIteratorstage generates value 6 (this is when you see the output)mapAsyncUnorderedstage. This time, it cannot be accepted immediately, so the acknowledgement is toContinuesome time later. Until that, no more values are generated byfromIterator.What's to note is that it's not
mapAsyncUnorderedstage that pulls the value out offromIterator, but thatfromIteratorgenerates these values on its own, and it cannot know in advance if the downstream transformation will accept the value immediately or not.Monix Observable tries to work synchronously as much as possible for performance reasons (switching threads is expensive). In general, unless controlled explicitly by methods like
executeAsync,executeOn, etc., you won't be able to tell whether operation will execute on same thread or not.