How to pass through items with certain logic?

80 views Asked by At

I'm looking for a way to pass through items with certain logic. The most obvious answer probably would be to use .Select which is sort of works with most of the cases but I have a special case and the question can be actually rephrased as how to call a certain method after an item is consumed by all subscribers?

I was thinking about an extension looking like this PassThrough(this IObservable<TSource> obj, Action<TSource, IObserver<TResult>> selector) and I would use it in the following way

.PassThrough((source, observer) => {
   if(source != null) {
      using(var result = new Result(source)) {
         observer.OnNext(result);
      }
   }
});

The most important part of this is calling .Dispose for the result object after the object is passed to OnNext in other words after it is consumed by subscribers. I didn't find such extension method. Could somebody give an example how to achieve it with existing Rx.NET API or how to create an extension which will do this, presuming it is possible?

1

There are 1 answers

0
aleksk On

What you are looking for is probably Observable.Create via an extension method. In your case it may look like the following:

public static IObservable<Result> PassThrough(this IObservable<TSource> obj, T source) {
   return Observable.Create(observer => {
      if(source != null) {
         using(var result = new Result(source)) {
            observer.OnNext(result);
            observer.OnCompleted();
         }
      } else {
         observer.OnError(Some Error);
         ...
      }
   })
}

Obviously depending on whether you want to keep the stream going, you would omit the OnCompleted() call.

See here for more information on usage of Observable.Create: http://introtorx.com/Content/v1.0.10621.0/04_CreatingObservableSequences.html#CreationOfObservables