How to do I show progress when using Reactive Extensions in C#

2.5k views Asked by At

Am using reactive extensions in C# to perform some calculations. Here is how my code looks like so far. I have tried to wrap the code around so that I can show progress while to executing a series of tasks within my Calculate method

Here is the observable

IObservable<ResultWithProgress<SampleResult>> Calculate(){
  return Observable.Create<ResultWithProgress<SampleResult>>(obs => {
     var someTask = DoSomeTask1();
     obs.OnNext(new ResultWithProgress(){Progress = 25, ProgressText     ="Completed Task1"});
     var someOtherTask = DoSomeMoreTask();
     obs.OnNext(new ResultWithProgress(){Progress = 50, ProgressText ="Completed Task2"});
     var calcResult = DoSomeMoreTask2();
     obs.OnNext(new ResultWithProgress(){Progress = 75, ProgressText = "Completed Task3"});
     var calcResult = FinalCalc();
     obs.OnNext(new ResultWithProgress(){Progress = 100, ProgressText ="Completed Task4", Result = calcResult});
     obs.OnCompleted();
  }

}

Result Class wrapping progress and result

class ResultWithProgress<T>{
 public int Progress {get; set;}
 public Result T {get; set;}
 public string ProgressText {get; set;}
}

Result object which contains the final result class SampleResult{}

Usage:

Calculate().Subscribe(resultWithProgress => {
  if(resultWithProgress.Result == null) //Show progress using   resultWithProgress.Progress
  else // get the result
})

I somehow feel that this might not the best way to do it. It feels that creating ResultWithProgress object many times without the Result seems like a code smell, especially if I have more than 10 tasks that I want to do within my Calculate()

I would appreciate it if you can give me any pointers on how to use this or am I approaching this problem wrongly?

2

There are 2 answers

0
Brandon On

This answer uses the same principles Enigmativity's answer discusses.

This version uses the async overload of Create.

It also makes use of the .NET 4.5 IProgress instead of a raw Action<T> to report progress.

struct CalculationProgress
{
    public int Progress { get; private set; }
    public string ProgressText { get; private set; }

    public CalculationProgress(int progress, string progressText)
        : this()
    {
        Progress = progress;
        ProgressText = progressText;
    }
}

public IObservable<Result> Calculate(IProgress<CalculationProgress> progress)
{
    return Observable.Create<Result>((observer, cancellationToken) =>
    {
        // run the work on a background thread
        // so we do not block the subscriber
        // and thus the subscriber has a chance
        // to unsubscribe (and cancel the work if desired)
        return Task.Run(() =>
        {
            DoSomeTask1();
            cancellationToken.ThrowIfCancellationRequested();
            progress.Report(new CalculationProgress(25, "First task"));

            DoSomeTask2();
            cancellationToken.ThrowIfCancellationRequested();
            progress.Report(new CalculationProgress(50, "Second task"));

            DoSomeTask3();
            cancellationToken.ThrowIfCancellationRequested();
            progress.Report(new CalculationProgress(75, "third task"));

            var result = DoFinalCalculation();            
            cancellationToken.ThrowIfCancellationRequested();
            progress.Report(new CalculationProgress(100, "final task"));

            observer.OnNext(result);
        }, cancellationToken);
    });
}
2
Enigmativity On

It took me some time to actually get your code to run. There were numerous syntax errors, but most importantly your Observable.Create did not have a return value.

Observable.Create should create an observable that the obs variable subscribes to and you return that IDisposable. That's so a subscriber can terminate the observable before it has completed.

Your observable directly interacts with the obs and finally calls obs.OnComplete() before the Observable.Create is completed. This means that there is no opportunity for the calling subscriber to terminate the computation because it has completed before the subscription has finished!

What you need is a way to build an observable within the Observable.Create to make it behave properly.

Now, since you are trying to return progress during your computation you are expecting side-effects. So it is easier to inject state at the beginning and just have a pure observable otherwise.

Here's how I might go about doing this.

First I change the signature of Calculate to become:

IObservable<string> Calculate(Action<ResultWithProgress<string>> progress)

Now I am injecting an action that I can use to report on my progress.

Here's how the call to Calculate might look:

Calculate(rwp => Console.WriteLine(rwp)).Subscribe(result => { });

Now here's the full Calculate method:

public IObservable<string> Calculate(Action<ResultWithProgress<string>> progress)
{
    return Observable.Create<string>(obs =>
    {
        // This action just removes duplication from the query below
        // and has the purpose of safely calling `progress`
        Action<int, string, string> report = (pv, r, pt) =>
        {
            var p = progress;
            if (p != null)
            {
                p(new ResultWithProgress<string>()
                {
                    Progress = pv,
                    Result = r,
                    ProgressText = pt,
                });
            }
        };

        var query =
            from someTask in Observable.Start(() => DoSomeTask1())
                .Do(x => report(25, x, "Completed Task1"))
            from someOtherTask in Observable.Start(() => DoSomeMoreTask())
                .Do(x => report(50, x, "Completed Task2"))
            from calcResultX in Observable.Start(() => DoSomeMoreTask2())
                .Do(x => report(75, x, "Completed Task3"))
            from calcResult in Observable.Start(() => DoSomeTask1())
                .Do(x => report(100, x, "Completed Task4"))
            select calcResult;

        return query.Subscribe(obs);
    });
}