RxJS - Continue sending notifications after error occurs in onNext

806 views Asked by At

I am new to Rx, and I would really appreciate a little help with error handling. I have the following code, which is basically a typeahead:

var observable = Rx.Observable.fromEvent(searchField, 'keyup')
    .map(ev => ev.target.value)
    .filter(text => text.length > 2)
    .debounce(500 /* ms */)
    .distinctUntilChanged()
    .flatMapLatest(getAsyncSearchResults);

observable.subscribe(function(value) {
        console.log("onNext");
        throw "error"; // THIS TERMINATES THE OBSERVABLE
    },
    function(error) {
        console.log("Error");
    },
    function() {
        console.log("Completed");
    });

The Observable terminates after an exception occurs in onNext function. This seems an unwanted behavior to me, since I don't want to wrap all of my onNext code inside a try-catch. Is there any way that I could tell the observable to keep issuing notification no matter what exception occurs?

3

There are 3 answers

0
Gluck On

That is the behavior of Rx on every platform, and the only way to catch exceptions in observers is ... well ... to catch them.

You could create your own subscribeSafe method which wraps a callback in try/catch if you often have this need.

Alternatively if you can move the part(s) which are throwing from the observer to the stream itself, then you can control the error flow, and (for instance) retry.

Example below:

// simulates (hot) keyup event
const obs1 = Rx.Observable.interval(1000).publish()
obs1.connect()

function getAsyncSearchResults(i) {
  return Rx.Observable.timer(500).mapTo(i)
}

obs1.switchMap(getAsyncSearchResults).do(x => {
  console.log(x) // all events gets logged
  throw new Error()
}).retry().subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>

0
James World On

You should really make your next, completed and error handlers safe for exceptions. There is no reasonable way for the source stream to "resume" a subscription when the subscriber errors - the only reasonable course of action is to stop talking to the subscriber.

For a full discussion of why this is see my answer How to handle exceptions thrown by observer's onNext in RxJava? - which applies just as well to any platform implementation of Rx.

Futhermore, see Why is the OnError callback never called when throwing from the given subscriber? for a good explanatory metaphor (the newsagent) of the considerations - particularly the need to consider there may be other subscribers who have not thrown an error in their next handlers.

2
iamturns On

You need to switch to a new disposable stream, and if an error occurs within it will be disposed safely, and keep the original stream alive.

I've modified your code into a simpler stream of 0 - 5 to demonstrate easily. It throws an error / exception if the number is 3.

Rx.Observable.from([0,1,2,3,4,5])
    .switchMap(value => {

        // This is the disposable stream!
        // Errors can safely occur in here without killing the original stream

        return Rx.Observable.of(value)
            .map(value => {
                if (value === 3) {
                    throw new Error('Value cannot be 3');
                }
                return value;
            })
            .catch(error => {
                // You can do some fancy stuff here with errors if you like
                // Below we are just returning the error object to the outer stream
                return Rx.Observable.of(error);
            });

    })
    .map(value => {
        if (value instanceof Error) {
            // Maybe do some error handling here
            return `Error: ${value.message}`;
        }
        return value;
    })
    .subscribe(
      (x => console.log('Success', x)),
      (x => console.log('Error', x)),
      (() => console.log('Complete'))
    );
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.1/Rx.min.js"></script>

More info on this technique in this blog post: The Quest for Meatballs: Continue RxJS Streams When Errors Occur