Is it the best to implement ObservableBase in this situation or is there another way?

1.6k views Asked by At

First of all, I didn't find a good example of custom implementation of the ObservableBase or AnonymousObservable. I have no idea which one I need to implement in my case if any. The situation is this.

I use a third-party library and there is a class let's call it Producer which allows me to set a delegate on it like objProducer.Attach(MyHandler). MyHandler will receive messages from the Producer. I'm trying to create a wrapper around the Producer to make it observable and ideally to be it a distinct type instead of creating just an instance of observable (like Observable.Create).

EDITED: Third-party Producer has the following interface

public delegate void ProducerMessageHandler(Message objMessage);
public class Producer : IDisposable {
   public void Start();
   public void Attach(ProducerMessageHandler fnHandler);
   public void Dispose();
}

as I mentioned I have no control over the source code of it. It is intended to be used like this: create an instance, call Attach and pass a delegate, call Start which basically initiates receiving messages inside the provided delegated when Producer receives them or generates them.

I was thinking about creating public class ProducerObservable : ObservableBase<Message> so that when somebody subscribes to it I would (Rx library would) push messages to the observers. It seems that I need to call Attach somewhere in the constructor of my ProducerObservable, then I need somehow to call OnNext on the observers attached to it. Does it mean that I have to code all this: add a list of observers LinkedList<IObserver<Message>> to the class and then add observers when SubscribeCore abstract method is called on the ProducerObservable? Then apparently I would be able to enumerate the LinkedList<IObserver<Message>> in the MyHandler and call OnNext for each one. All these looks feasible but it doesn't feel exactly right. I would expect .net reactive extensions to be better prepare to situations like this and have at least the implementation of the LinkedList<IObserver<Message>> ready somewhere in base class.

3

There are 3 answers

0
alex.49.98 On BEST ANSWER

This discussion just gave me an idea. Isn't it just this?

public class ProducerObservable : IObservable<Message>, IDisposable {
   private readonly Producer _Producer;
   private readonly Subject<Message> _Subject;

   public ProducerObservable() {
      _Produder = new Producer();
      _Producer.Attach(Message_Received);
      _Subject = new Subject<Message>();
      _Producer.Start();
   }

   public void Dispose() {
      _Producer.Dispose();
      _Subject.Dispose();
   }

   public IDisposable Subscribe(IObserver<Message> objObserver) {
      return _Subject.Subscribe(objObserver);
   }

   private void Message_Received(Message objMessage) {
      _Subject.OnNext(objMessage);
   }
}

Thus, it seems to me we avoid extra levels, extra observables, having just a single observable type and basically I see only advantages and no disadvantages.

6
Enigmativity On

Here's what you should be doing to be Rx "friendly":

public static class ObservableProducer
{
    public static IObservable<Message> Create()
    {
        return 
            Observable.Using(() => new Producer(), p =>
                Observable.Create<Message>(o => 
                {
                    ProducerMessageHandler handler = m => o.OnNext(m);
                    p.Attach(handler);
                    return Disposable.Create(() => o.OnCompleted());
                }));
    }
}

You would use this like this:

IObservable<Message> query = ObservableProducer.Create();

You should allow multiple Producer instances to be created for all new subscriptions - that's how Rx works.

However, if you only want a single Producer instance then look at using .Publish() on this observable.

Here's how to make sure that the single Producer instance is "self-managing":

IObservable<Message> query = ObservableProducer.Create().Publish().RefCount();

This will create a single Producer instance on the first subscription and keep that Producer until there are no longer any subscriptions. This makes it "self-managing" and a better solution that rolling your own class.

If you have to implement your own class then you are often going to make mistakes. The class you added as an answer to this question has three that I can see.

  1. You instantiate the subject after you attach the message handler. If the producer creates a message during the attach process your code will fail.
  2. You don't keep track of the subscriptions. If you don't track your subscriptions then you can't dispose of them. Rx queries can hold open expensive resources so you should dispose of them as early as possible.
  3. You don't call .OnCompleted() on the subject prior to disposal of the producer.

Here's my implementation of your class:

public class ProducerObservable : IObservable<Message>, IDisposable
{
    private readonly Producer _Producer;
    private readonly Subject<Message> _Subject;
    private readonly CompositeDisposable _Disposables;

    public ProducerObservable()
    {
        _Subject = new Subject<Message>();
        ProducerMessageHandler fnHandler = m => _Subject.OnNext(m);

        _Producer = new Producer();
        _Producer.Attach(fnHandler);
        _Producer.Start();

        _Disposables = new CompositeDisposable();
        _Disposables.Add(_Producer);
        _Disposables.Add(_Subject);
    }

    public void Dispose()
    {
        _Subject.OnCompleted();
        _Disposables.Dispose();
    }

    public IDisposable Subscribe(IObserver<Message> objObserver)
    {
        var subscription = _Subject.Subscribe(objObserver);
        _Disposables.Add(subscription);
        return subscription;
    }
}

I still don't like it. At the time of writing this I'm one of three people with a silver badge in [system.reactive] (no-one has gold yet) and I've never implemented my own observable. I only just realized that I didn't call .OnCompleted() on the subject, so I went back and edited my code above. It's a minefield. It's so much better relying on the inbuilt operators.

The reason that ObservableBase exists is to help prevent people making mistakes, but it doesn't stop it.

2
Brandon On

In code that uses Rx, "Producer" objects are usually objects that expose IObservable<T> instances via public properties or methods. It is less common that the Producer class itself would implement IObservable<T>, and when it does, it does so by using Rx to do the heavy lifting under the hood. You absolutely never want to implement an IObservable<T> yourself.

Here's an example where the observable is exposed as a property:

public class Producer
{
    public Producer(ThirdPartyLib.Producer p)
    {
        var c = Observable.Create(observer =>
        {
            ProducerMessageHandler h = msg => observer.OnNext(msg);
            p.Attach(h);
            p.Start();

            return Disposable.Empty;
        }).Publish();

        // Connect the observable the first time someone starts
        // observing
        Stream = Observable.Create(observer =>
        {
            var subscription = c.Subscribe(observer);
            if (Interlocked.Exchange(ref _connected, 1) == 0)
            {
                c.Connect();
            }

            return subscription;
        });
    }

    private int _connected;
    public IObservable<Message> Stream { get; private set; }
}

And here is the same example where we actually implement IObservable<T> by delegating to Rx:

public class Producer : IObservable<Message>
{
    public Producer(ThirdPartyLib.Producer p)
    {
        var c = Observable.Create(observer =>
        {
            ProducerMessageHandler h = msg => observer.OnNext(msg);
            p.Attach(h);
            p.Start();

            return Disposable.Empty;
        }).Publish();

        // Connect the observable the first time someone starts
        // observing
        _stream = Observable.Create(observer =>
        {
            var subscription = c.Subscribe(observer);
            if (Interlocked.Exchange(ref _connected, 1) == 0)
            {
                c.Connect();
            }

            return subscription;
        });
    }

    private IObservable<Message> _stream;

    // implement IObservable<T> by delegating to Rx
    public IDisposable Subscribe(IObserver<Message> observer)
    {
        return _stream.Subscribe(observer);
    }
}