Map errors to observable using C# ReactiveX

578 views Asked by At

I have an observable MyObservable<Object> which can throw CustomExceptions where

private class CustomException : Exception

What I want to do is convert the CustomExceptions into objects and emit those in a new observable.

This is my solution so far but I was wondering if this could be done without having to directly call the Subject's onNext, onCompleted or onError methods.

var MySubject = new Subject<NewObject>();

MyObservable.Catch<Object, CustomException>(
            ex =>
            {
                NewObject o = new NewObject(ex.Message);
                MySubject.OnNext(o);
                return Observable.Empty<Object>();
            });

IObservable<IList<NewObject>> listObservable = MySubject.ToList();

Edit: Thanks ibebbs! Worked like a charm!

1

There are 1 answers

0
ibebbs On BEST ANSWER

You can catch and map exceptions without a subject by using the Materialize() function as shown here:

var errorObservable = source
    .Select(projection)
    .Materialize()
    .Where(notification => notification.Kind == NotificationKind.OnError)
    .Select(notification => notification.Exception)
    .OfType<CustomException>()
    .Select(exception => new NewObject(exception.Message));

The Materialize function takes an IObservable<T> and maps it to a IObservable<Notification<T>> where each Notification has a Kind of OnNext, OnError or OnComplete. The above observable simply looks for Notifications with a Kind`` of OnError and with the Exception being an instance of CustomException then projects these exceptions into anIObservable```.

Here is a unit test showing this working:

[Fact]
public void ShouldEmitErrorsToObservable()
{
    Subject<int> source = new Subject<int>();
    List<int> values = new List<int>();
    List<NewObject> errors = new List<NewObject>();

    Func<int, int> projection =
        value =>
        {
            if (value % 2 == 1) throw new CustomException("Item is odd");

            return value;
        };

    Func<CustomException, IObservable<int>> catcher = null;

    catcher = ex => source.Select(projection).Catch(catcher);

    var errorObservable = source
        .Select(projection)
        .Materialize()
        .Where(notification => notification.Kind == NotificationKind.OnError)
        .Select(notification => notification.Exception)
        .OfType<CustomException>()
        .Select(exception => new NewObject(exception.Message));

    var normalSubscription = source.Select(projection).Catch(catcher).Subscribe(values.Add);
    var errorSubscription = errorObservable.Subscribe(errors.Add);

    source.OnNext(0);
    source.OnNext(1);
    source.OnNext(2);

    Assert.Equal(2, values.Count);
    Assert.Equal(1, errors.Count);
}

However, as you can see with the construed catch mechanisms employed above, exception handling in Rx can be tricky to get right and even more difficult to do elegantly. Instead, consider that Exceptions should be Exceptional and, if you expect an class of error such that you've written a custom exception for it, then the error is not really exceptional but part of a process flow that must handle these errors.

In this instance, I would recommend projecting the observable into a class which embodies the "try this operation and record the result, be it a value or an exception" and using this further along the execution chain.

In the example below, I use a "Fallible" class to capture the result or exception of an operation and then subscribe to a stream of "Fallible" instances, separating the errors from values. As you will see, the code is both neater and better performing as both the errors and values share a single subscription to the underlying source:

internal class Fallible
{
    public static Fallible<TResult> Try<TResult, TException>(Func<TResult> action) where TException : Exception
    {
        try
        {
            return Success(action());
        }
        catch (TException exception)
        {
            return Error<TResult>(exception);
        }
    }

    public static Fallible<T> Success<T>(T value)
    {
        return new Fallible<T>(value);
    }

    public static Fallible<T> Error<T>(Exception exception)
    {
        return new Fallible<T>(exception);
    }
}

internal class Fallible<T>
{
    public Fallible(T value)
    {
        Value = value;
        IsSuccess = true;
    }

    public Fallible(Exception exception)
    {
        Exception = exception;
        IsError = true;
    }

    public T Value { get; private set; }
    public Exception Exception { get; private set; }
    public bool IsSuccess { get; private set; }
    public bool IsError { get; private set; }
}

[Fact]
public void ShouldMapErrorsToFallible()
{
    Subject<int> source = new Subject<int>();
    List<int> values = new List<int>();
    List<NewObject> errors = new List<NewObject>();

    Func<int, int> projection =
        value =>
        {
            if (value % 2 == 1) throw new CustomException("Item is odd");

            return value;
        };

    var observable = source
        .Select(value => Fallible.Try<int, CustomException>(() => projection(value)))
        .Publish()
        .RefCount();

    var errorSubscription = observable
        .Where(fallible => fallible.IsError)
        .Select(fallible => new NewObject(fallible.Exception.Message))
        .Subscribe(errors.Add);

    var normalSubscription = observable
        .Where(fallible => fallible.IsSuccess)
        .Select(fallible => fallible.Value)
        .Subscribe(values.Add);

    source.OnNext(0);
    source.OnNext(1);
    source.OnNext(2);

    Assert.Equal(2, values.Count);
    Assert.Equal(1, errors.Count);
}

Hope it helps.