How in Reactive x (ideally with examples in RxJava or RxJs) can be achieved this ?
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)
a
is an infinite stream of events which trigger finite stream sn
of events each of which should be part of infinite stream S
while being able to subscribe to each sn
stream ( in order to do summation operations) but at the same time keeping stream S
as infinite.
EDIT: To be more concrete I provide the implementation of what I am looking for in Kotlin.
Every 10 second an event is emitted which maps to shared finite stream of 4 events. The metastream is flatMap
-ed into normal infinite stream. I make use of doAfterNext
to additionally subscribe to each finite stream and print out results.
/** Creates a finite stream with events
* $ch-1 - $ch-4
*/
fun createFinite(ch: Char): Observable<String> =
Observable.interval(1, TimeUnit.SECONDS)
.take(4)
.map({ "$ch-$it" }).share()
fun main(args: Array<String>) {
var ch = 'A'
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.doAfterNext {
it
.count()
.subscribe({ c -> println("I am done. Total event count is $c") })
}
.flatMap { it }
.subscribe { println("Just received [$it] from the infinite stream ") }
// Let main thread wait forever
CountDownLatch(1).await()
}
However I am not sure if this is the 'pure RX' way.
You don't make clear how you want to do the counting. If you are doing a total count, then there is no need to do the interior subscription:
On the other hand, if you need to provide a count for each intermediate observable, then you can move the counting inside the
flatMap()
and print out the count and reset it on completion:This isn't very functional, but this kind of reporting tends to break normal streams.