Convert infinite stream of finite streams to an infinite stream - Reactive X

660 views Asked by At

How in Reactive x (ideally with examples in RxJava or RxJs) can be achieved this ?

a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2                       |-x-x-x-x-x-| (subscribe)
s2                                               |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)

a is an infinite stream of events which trigger finite stream sn of events each of which should be part of infinite stream S while being able to subscribe to each sn stream ( in order to do summation operations) but at the same time keeping stream S as infinite.

EDIT: To be more concrete I provide the implementation of what I am looking for in Kotlin. Every 10 second an event is emitted which maps to shared finite stream of 4 events. The metastream is flatMap-ed into normal infinite stream. I make use of doAfterNext to additionally subscribe to each finite stream and print out results.

/** Creates a finite stream with events
 * $ch-1 - $ch-4
 */
fun createFinite(ch: Char): Observable<String> =
        Observable.interval(1, TimeUnit.SECONDS)
                .take(4)
                .map({ "$ch-$it" }).share()

fun main(args: Array<String>) {

    var ch = 'A'

    Observable.interval(10, TimeUnit.SECONDS).startWith(0)
            .map { createFinite(ch++) }
            .doAfterNext {
                it
                        .count()
                        .subscribe({ c -> println("I am done. Total event count is $c") })
            }
            .flatMap { it }
            .subscribe { println("Just received [$it] from the infinite stream ") }

    // Let main thread wait forever
    CountDownLatch(1).await()
}

However I am not sure if this is the 'pure RX' way.

1

There are 1 answers

0
Bob Dalgleish On

You don't make clear how you want to do the counting. If you are doing a total count, then there is no need to do the interior subscription:

AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it }
        .doOnNext( counter.incrementAndget() )
        .subscribe { println("Just received [$it] from the infinite stream ") }

On the other hand, if you need to provide a count for each intermediate observable, then you can move the counting inside the flatMap() and print out the count and reset it on completion:

AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it
                     .doOnNext( counter.incrementAndget()
                     .doOnCompleted( { long ctr = counter.getAndSet(0)
                                        println("I am done. Total event count is $ctr")
                                     } )
        .subscribe { println("Just received [$it] from the infinite stream ") }

This isn't very functional, but this kind of reporting tends to break normal streams.