Memory management with RxJS Observable.concatMap?

754 views Asked by At

I'm new to RxJS and so still learning how to use the library. The documentation for concatMap gives the following warning:

Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.

This is a problem for me because I have a memory intensive but fast concatMap feeding a slow concatMap. It's set up like this:

let uploadObs = Observable.range(0, blockCount).concatMap(blockIndex => {
    // This part is fast and memory intensive. I'd like to use
    // a bounded buffer here or something similar to control
    // memory utilization

    let blockReaderObs = ...;
    // ... read a block from a large file object in blockReaderObs
    return blockReaderObs;
}).concatMap((blockData, index) => {
    // This part involves a POST so is much slower than reading a
    // file block
    let objFromBlockData = someTransformation(blockData);
    return this.http.post(someUrl, objFromBlockData)
        .map(transformResponse);
});

What is the right approach to dealing with this kind of problem in RxJS?

2

There are 2 answers

2
Asti On

This is a classic producer-consumer problem. You can use backpressure operators to limit the number of elements being sent in for processing. See controlled streams.

0
ckovacs On

I had a similar issue using concatMap and concatAll. I realized that I was creating way too many observables that were waiting to be subscribed to.

This answer in a separate SO discussion was helpful in writing my own solution to limit how much I would put into a concatMap:

https://stackoverflow.com/a/40845089/181961