The policies of the Polly library, for example Bulkhead
, Retry
etc, contain a method ExecuteAsync
with many overloads (18), but none of them allows to execute the policy for all elements of an IEnumerable
and gather the results. It seems that the whole library is focused on the goal of executing a single action, leaving the responsibility of managing multiple executions to the client code. I would like to fix this omission by implementing an extension method for all Polly policies (all implementations of the IAsyncPolicy
interface), with the signature below:
public static Task<TResult[]> ExecuteAsync<TSource, TResult>(
this IAsyncPolicy policy,
IEnumerable<TSource> source,
Func<TSource, Task<TResult>> action,
bool continueOnCapturedContext = false,
bool onErrorContinue = false)
The continueOnCapturedContext
parameter controls whether to continue on the captured synchronization context, and should just be passed
to the native ExecuteAsync
method:
Task<TResult> IAsyncPolicy.ExecuteAsync<TResult>(
Func<CancellationToken, Task<TResult>> action,
CancellationToken cancellationToken,
bool continueOnCapturedContext);
The onErrorContinue
parameter is the most important aspect of this question, since it controls the behavior in case of a policy failure. My intention is to use this extension method with thousands of elements, and in case of any exceptions that are not expected/handled by my policy¹ I would like to terminate the whole execution promptly and gracefully. In case the argument onErrorContinue
has the default value false
, the first unhandled exception should cause the cancellation of all pending operations, and the whole execution should terminate as soon as all started operations have completed. In the opposite case of onErrorContinue: true
, all elements should be processed by the policy. Finally all exceptions should be propagated, bundled in an AggregateException
, independently of the onErrorContinue
value.
How could I implement this extension method?
Hypothetical usage scenario of this method:
var policy = Policy
.BulkheadAsync(maxParallelization: 10, maxQueuingActions: Int32.MaxValue)
.WrapAsync(Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(retryCount: 3,
sleepDurationProvider: n => TimeSpan.FromMilliseconds(1000 * n))
);
var urls = Enumerable.Range(1, 1000).Select(n => n.ToString());
var random = new Random(0);
string[] results = await policy.ExecuteAsync(urls, async url =>
{
await Task.Delay(500); // Simulate a web request
lock (random) if (random.NextDouble() < 0.66)
throw new HttpRequestException($"Url #{url} failed");
return url;
}, onErrorContinue: false);
¹ This should happen rarely in production, but may happen frequently during the development, and could hurt productivity.
Here is my implementation of the
ExecuteAsync
method. ACancellationTokenSource
is used for cancelling the pending operations in case of an exception. TheTask.WhenAll
does a good job at ignoring theOperationCanceledException
s, when there are more important exceptions to propagate. Finally theTask.WhenAll
task is returned without beingawait
ed, in order to preserve all exceptions.Emulating the
Task.WhenAll
behavior, which is desirable in this case, is quite tricky otherwise (with async/await). So I am happily avoiding this trouble by using a small and dirtyContinueWith
, in order to finally dispose theCancellationTokenSource
.An alternative way of dealing with multiple exceptions is presented here. This solution propagates a nested
AggregateException
, which sounds ugly, but in practice it's OK becauseawait
ing an asynchronous method eliminates one level of nesting anyway.