I'm new to RxJS and so still learning how to use the library. The documentation for concatMap
gives the following warning:
Warning: if source values arrive endlessly and faster than their corresponding inner Observables can complete, it will result in memory issues as inner Observables amass in an unbounded buffer waiting for their turn to be subscribed to.
This is a problem for me because I have a memory intensive but fast concatMap feeding a slow concatMap. It's set up like this:
let uploadObs = Observable.range(0, blockCount).concatMap(blockIndex => {
// This part is fast and memory intensive. I'd like to use
// a bounded buffer here or something similar to control
// memory utilization
let blockReaderObs = ...;
// ... read a block from a large file object in blockReaderObs
return blockReaderObs;
}).concatMap((blockData, index) => {
// This part involves a POST so is much slower than reading a
// file block
let objFromBlockData = someTransformation(blockData);
return this.http.post(someUrl, objFromBlockData)
.map(transformResponse);
});
What is the right approach to dealing with this kind of problem in RxJS?
This is a classic producer-consumer problem. You can use backpressure operators to limit the number of elements being sent in for processing. See controlled streams.