I'm having issues with proper handling of exceptions thrown from pipeline steps created using Open.ChannelExtensions library. In some scenarios exception is being swallowed instead of being propagated to caller.
From my observations, it seems that it's somehow related to the .Batch() step, also moment of throwing exceptions may have some meaning.
Am I doing something wrong? How it should be properly handled to propagate exception up?
// See https://aka.ms/new-console-template for more information
using System.Threading.Channels;
using Open.ChannelExtensions;
var test = new Test();
try
{
//await test.Scenario1(); //exception catched
//await test.Scenario2(); //exception swallowed
//await test.Scenario3(); //exception catched
//await test.Scenario4(); //exception sometimes catched (~25% chance)
}
catch (Exception)
{
Console.WriteLine("Got exception");
}
class Test
{
public async Task Scenario1()
{
var channel = Channel.CreateBounded<int>(10000);
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
var task = channel.Reader.Pipe(1, (element) =>
{
throw new Exception();
Console.WriteLine(element);
return 1;
})
.Pipe(2, (evt) =>
{
Console.WriteLine("\t" + evt);
return evt * 2;
})
//.Batch(20)
.PipeAsync(1, async (evt) =>
{
Console.WriteLine("\t\t" + evt);
return Task.FromResult(evt);
})
.ReadAll(task =>
{
});
channel.Writer.TryComplete();
await task;
Console.WriteLine("end");
}
public async Task Scenario2()
{
var channel = Channel.CreateBounded<int>(10000);
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
var task = channel.Reader.Pipe(1, (element) =>
{
throw new Exception();
Console.WriteLine(element);
return 1;
})
.Pipe(2, (evt) =>
{
Console.WriteLine("\t" + evt);
return evt * 2;
})
.Batch(20)
.PipeAsync(1, async (evt) =>
{
Console.WriteLine("\t\t" + evt);
return Task.FromResult(evt);
})
.ReadAll(task =>
{
});
channel.Writer.TryComplete();
await task;
}
public async Task Scenario3()
{
var channel = Channel.CreateBounded<int>(10000);
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
var task = channel.Reader.Pipe(1, (element) =>
{
if(element == 20)
throw new Exception();
Console.WriteLine(element);
return 1;
})
.Pipe(2, (evt) =>
{
Console.WriteLine("\t" + evt);
return evt * 2;
})
//.Batch(20)
.PipeAsync(1, async (evt) =>
{
Console.WriteLine("\t\t" + evt);
return Task.FromResult(evt);
})
.ReadAll(task =>
{
});
channel.Writer.TryComplete();
await task;
}
public async Task Scenario4()
{
var channel = Channel.CreateBounded<int>(10000);
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
var task = channel.Reader.Pipe(1, (element) =>
{
if (element == 20)
throw new Exception();
Console.WriteLine(element);
return 1;
})
.Pipe(2, (evt) =>
{
Console.WriteLine("\t" + evt);
return evt * 2;
})
.Batch(20)
.PipeAsync(1, async (evt) =>
{
Console.WriteLine("\t\t" + evt);
return Task.FromResult(evt);
})
.ReadAll(task =>
{
});
channel.Writer.TryComplete();
await task;
}
}
First, there's nothing wrong with using Channels for pipelines. That's what they were created for.
Error and exception handling in pipelines is harder than plain old calls though, whether you build them using Actors, Dataflow, Channels, IAsyncEnumerable, Observable and Reactive Extensions or function chains:
Especially for IAsyncEnumerable and Channels, since they use a pull model, downstream steps won't notice that something is wrong until they try to read from the previous step.
On top of that the Batch implementation doesn't seem to have explicit code to detect a failed source.
For this reason, exceptions should not escape from steps. A common pattern is to use a
Result<TResult,TError>class to wrap the data passed among steps and either discard or forward error results. You'll often see this described as Railway Oriented Programming because error messages are moved to a "parallel" track that gets ignored by steps.A lazy implementation could be:
Such code can easily be bundled in a generic method. The best place to put it would be inside the
PipeandTransformmethods. Unfortunately,Open.ChannelExtensionsdoesn't follow this model.Such a method could be :
Which can be used like this
Callcould also be an extension method onFunc<T,U>allowing cleaner code:To abort the entire pipeline, a CancellationTokenSource should be used to signal cancellation to all steps in the pipeline, both up- and down-stream. In case of critical exceptions, the exception handler inside the should signal the CTS to abort the entire pipeline.
It's worth noting that
Pipeetc all accept aCancellationTokenfor this reason.In less critical situations a step could simply stop producing results. This would allow downstream steps to process any remaining good data