C# Rx How to properly dispose of source Enumerable in created Observable

655 views Asked by At

I would like to adapt an IEnumerable,IDisposable (source) into an Observable and would like to know the best way to do this and have the source.Dispose method get called upon unsubscribe.

There is an example on introtorx.com of adapting an IEnumerable, but it explicitly states that it has many shortcomings such as incorrect disposal pattern, poor concurrency model, no error handling, etc... and that the built in version handles these. But the built in version doesn't seem to call Dispose on the source IEnumerable upon unsubscription.

Ideally I'd like to use the .Publish().RefCount() pattern to have multiple subscribers on the same source and only have the source Dispose() called when they are all unsubscribed.

Here are is the code for my attempt, though it's not working.

static void FromEnumerableTest() {
    var observable = Observable.Create<int>(
        observer => {
            var source = new JunkEnumerable();
            foreach (int i in source) {
                observer.OnNext(i);
            }
            return () => {
                source.Dispose();
            };
        })
        .SubscribeOn(Scheduler.Default)
        .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
        .Publish()
        .RefCount();

    //var observable = Observable.ToObservable(new JunkEnumerable())
    //    .SubscribeOn(Scheduler.Default)
    //    .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
    //    .Publish()
    //    .RefCount();

    Console.WriteLine("Press any key to subscribe");
    Console.ReadKey();

    var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i));
    Console.WriteLine("Press any key to unsubscribe");
    Console.ReadKey();
    subscription.Dispose();

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();
}


class JunkEnumerable : IEnumerable<int>, IDisposable {
    public void Dispose() { Console.WriteLine("JunkEnumerable.Dispose invoked"); }

    public IEnumerator<int> GetEnumerator() { return new Enumerator(); }

    IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); }

    class Enumerator : IEnumerator<int> {
        private int counter = 0;
        public int Current {
            get {
                Thread.Sleep(1000);
                return counter++;
            }
        }

        object IEnumerator.Current { get { return Current; } }

        public void Dispose() { Console.WriteLine("JunkEnumerable.Enumerator.Dispose invoked"); }

        public bool MoveNext() { return true; }

        public void Reset() { counter = 0; }
    }
}
2

There are 2 answers

1
Shlomo On BEST ANSWER

There are three stages in an Rx subscription-lifetime:

  1. Subscription
  2. Observation
  3. Unsubscription

If the subscription never completes, the unsubscription code doesn't happen. After all, if you never fully subscribed, why should you need to unsubscribe? Your sample code has an infinite loop in the subscription code, so it never completes, so the unsubscription code will never happen.

The normal way to handle an IDisposable is with Observable.Using. The normal way to handle an IEnumerable is with .ToObservable. If you're trying to introduce asynchrony to synchronous, enumerable code (like your example), you can do so as follows:

var observable = Observable.Using(() => new JunkEnumerable(), junk => 
    Observable.Generate(junk.GetEnumerator(), e => e.MoveNext(), e => e, e => e.Current, e => TimeSpan.FromMilliseconds(20))
);

As long as the TimeSpan is greater than 15 millis, Rx will turn it async, completing the subscription. The subsequent values are part of the observation stage, and unsubscription will fully take place.

0
Asti On

Here's an operator to run the enumeration on a specified scheduler. We schedule each enumeration of the enumerable so that the disposables can correctly return.

    public static IObservable<T> ToObservableOn<T>(this IEnumerable<T> source, IScheduler scheduler = default(IScheduler))
    {
        scheduler = scheduler ?? Scheduler.Default;
        return Observable.Create<T>(
            (observer) =>
            {
                var disposed = new BooleanDisposable();
                var enumerator = source.GetEnumerator();

                Action scheduleNext = default(Action);
                scheduleNext = () =>
                {
                    if (disposed.IsDisposed)
                        return;

                    if (!enumerator.MoveNext())
                    {
                        observer.OnCompleted();
                        return;
                    }

                    observer.OnNext(enumerator.Current);

                    scheduler.Schedule(scheduleNext);
                };

                scheduler.Schedule(scheduleNext);
                return StableCompositeDisposable.Create(disposed, enumerator);
            });
    }

From your example, we just change the SubscribeOn to:

        var observable = 
            new JunkEnumerable()
            .ToObservableOn(Scheduler.Default)                
            .Do(i => Console.WriteLine("Publishing {0}", i))    // side effect to show it is running
            .Publish()
            .RefCount();