Controlling the max thread count of SelectMany

308 views Asked by At

Is there a method I can use to set the maximum thread count of an IObservable.SelectMany?

The following code works great for updating the UI as items are processed, but the task I'm trying to execute is a bit of a resource hog at the moment. I would like to set the max threads to like two, to be a bit lighter on resource usage.

AsyncCommand = ReactiveCommand.CreateAsyncObservable(_ => 
{
    // Set progress bar indicator to 0

    var set = new [] {...} // The set of items to process

    // Set the progress bar indicator max to the count of the items to process

    return set
        .ToObservable()
        .SelectMany((item, index) => Task.Run(() =>
        {
            // Process item

            return item;
        }), (item, index, processed) => item); 
});

AsyncCommand
    .ObserveOn(RxApp.MainThreadScheduler)
    .Subscribe(item => 
    {
        // Step the progress bar indicator
    });
2

There are 2 answers

0
paulpdaniels On BEST ANSWER

Merge has a max parallelism parameter:

AsyncCommand = ReactiveCommand.CreateAsyncObservable(_ => 
{
    // Set progress bar indicator to 0

    var set = new [] {...} // The set of items to process

    // Set the progress bar indicator max to the count of the items to process

    return set
        .ToObservable()
        .Select(item => Observable.FromAsync(() => DoAsyncProcess(item))))
        .Merge(2);
});

See also a more advanced solution

0
Daniel C. Weber On

SelectMany, as used in your code snippet, does not introduce any concurrency by itself and therefore does not create any thread. It's the TPL that does (since you use Task.Run). The TPL usually does a good job at not creating too many threads to get the job done. Have a look here and subsequently here if you really want to throttle the maximum number of threads.

As a simple alternative, use an AsyncSemaphore from Stephen Cleary's great AsyncEx package and put your processing code between WaitAsync and Release calls.