I have a case when I need to receive data from more than one IAsyncEnumerable source. For performance benefit it should be performed in parallel manner.
I have written such code to achieve this goal using AsyncAwaitBestPractices, System.Threading.Tasks.Dataflow and System.Linq.Async nuget packages:
public static async IAsyncEnumerable<T> ExecuteSimultaneouslyAsync<T>(
this IEnumerable<IAsyncEnumerable<T>> sources,
int outputQueueCapacity = 1,
TaskScheduler scheduler = null)
{
var sourcesCount = sources.Count();
var channel = outputQueueCapacity > 0
? Channel.CreateBounded<T>(sourcesCount)
: Channel.CreateUnbounded<T>();
sources.AsyncParallelForEach(
async body =>
{
await foreach (var item in body)
{
await channel.Writer.WaitToWriteAsync();
await channel.Writer.WriteAsync(item);
}
},
maxDegreeOfParallelism: sourcesCount,
scheduler: scheduler)
.ContinueWith(_ => channel.Writer.Complete())
.SafeFireAndForget();
while (await channel.Reader.WaitToReadAsync())
yield return await channel.Reader.ReadAsync();
}
public static async Task AsyncParallelForEach<T>(
this IEnumerable<T> source,
Func<T, Task> body,
int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
TaskScheduler scheduler = null)
{
var options = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxDegreeOfParallelism
};
if (scheduler != null)
options.TaskScheduler = scheduler;
var block = new ActionBlock<T>(body, options);
foreach (var item in source)
block.Post(item);
block.Complete();
await block.Completion;
}
This code works fine until two or more sources throws exception. It leeds to situation when second exception can not be handled and crushes application in some cases.
So I wonder if there is better way to consume data from several IAsyncEnumerable sources in parallel manner?
Keeping a pipeline running in case of exceptions is extremely difficult whether it's a functional or CSP pipeline. In most cases a pipeline will need to keep working even in case of individual message failures. A failing message doesn't mean the entire pipeline has failed.
That's why Railway-oriented programming is used to wrap messages and errors into
Result<TOk,TError>wrappers and "redirect" or ignore error messages. Such a class makes programming Dataflow, Channels and IAsyncEnumerable pipelines a lot easier.In F#, using discriminated unions, one could define a Result type just with
DUs aren't in C# yet, so various alternatives have been proposed, some using inheritance from an
IResult<>base, some using classes/Records which allow exhaustive pattern matching, something not available with theIResult<>techniques.Let's assume the
Result<>here is :The first step is to create a
CopyAsynchelper that will copy all data from the inputIAsyncEnumerable<Result<T>>to an outputChannelWriter<Result<T>>This way, even if an exception is thrown, a Failure message will be emitted instead of aborting the pipeline.
With that, you can merge multiple sources by copying input messages to an output channel :
Using BoundedCapacity=1 maintains the backpressure behavior of downstread channels or consumers.
You can read all messages in a ChannelReader through Channel.ReadAllAsync(CancellationToken) :
You can avoid exposing the channel by returning
IAsyncEnumerable<>:You can use System.Linq.Async to work on an IAsyncEnumerable<> using LINQ methods, eg to convert an
IAsyncEnumerable<T>to anIAsyncEnumerable<Result<T>>:Or filter failed messages before processing them :
You could create a method that applies a
Func<T1,Task<T2>>to an input and propagates results or errors as results :This is a ... bit ... easier in F#