Exception handling in Open.ChannelExtensions pipelines

569 views Asked by At

I'm having issues with proper handling of exceptions thrown from pipeline steps created using Open.ChannelExtensions library. In some scenarios exception is being swallowed instead of being propagated to caller.

From my observations, it seems that it's somehow related to the .Batch() step, also moment of throwing exceptions may have some meaning.

Am I doing something wrong? How it should be properly handled to propagate exception up?

// See https://aka.ms/new-console-template for more information

using System.Threading.Channels;
using Open.ChannelExtensions;

var test = new Test();
try
{
    //await test.Scenario1();   //exception catched
    //await test.Scenario2();   //exception swallowed
    //await test.Scenario3();   //exception catched
    //await test.Scenario4();   //exception sometimes catched (~25% chance)
}
catch (Exception)
{
    Console.WriteLine("Got exception");
}


class Test
{
    public async Task Scenario1()
    {
        var channel = Channel.CreateBounded<int>(10000);

        for (int i = 0; i < 100; i++)
        {
            await channel.Writer.WriteAsync(i);
        }

        var task = channel.Reader.Pipe(1, (element) =>
            {
                throw new Exception();
                Console.WriteLine(element);
                return 1;
            })
            .Pipe(2, (evt) =>
            {
                Console.WriteLine("\t" + evt);
                return evt * 2;
            })
            //.Batch(20)
            .PipeAsync(1, async (evt) =>
            {
                Console.WriteLine("\t\t" + evt);
                return Task.FromResult(evt);

            })
            .ReadAll(task =>
            {
            });

        channel.Writer.TryComplete();
        await task;

        Console.WriteLine("end");
    }

    public async Task Scenario2()
    {
        var channel = Channel.CreateBounded<int>(10000);

        for (int i = 0; i < 100; i++)
        {
            await channel.Writer.WriteAsync(i);
        }

        var task = channel.Reader.Pipe(1, (element) =>
            {
                throw new Exception();
                Console.WriteLine(element);
                return 1;
            })
            .Pipe(2, (evt) =>
            {
                Console.WriteLine("\t" + evt);
                return evt * 2;
            })
            .Batch(20)
            .PipeAsync(1, async (evt) =>
            {
                Console.WriteLine("\t\t" + evt);
                return Task.FromResult(evt);

            })
            .ReadAll(task =>
            {
            });

        channel.Writer.TryComplete();
        await task;
    }

    public async Task Scenario3()
    {
        var channel = Channel.CreateBounded<int>(10000);

        for (int i = 0; i < 100; i++)
        {
            await channel.Writer.WriteAsync(i);
        }

        var task = channel.Reader.Pipe(1, (element) =>
            {
                if(element == 20)
                throw new Exception();
                Console.WriteLine(element);
                return 1;
            })
            .Pipe(2, (evt) =>
            {
                Console.WriteLine("\t" + evt);
                return evt * 2;
            })
            //.Batch(20)
            .PipeAsync(1, async (evt) =>
            {
                Console.WriteLine("\t\t" + evt);
                return Task.FromResult(evt);

            })
            .ReadAll(task =>
            {
            });

        channel.Writer.TryComplete();
        await task;
    }

    public async Task Scenario4()
    {
        var channel = Channel.CreateBounded<int>(10000);

        for (int i = 0; i < 100; i++)
        {
            await channel.Writer.WriteAsync(i);
        }

        var task = channel.Reader.Pipe(1, (element) =>
            {
                if (element == 20)
                    throw new Exception();
                Console.WriteLine(element);
                return 1;
            })
            .Pipe(2, (evt) =>
            {
                Console.WriteLine("\t" + evt);
                return evt * 2;
            })
            .Batch(20)
            .PipeAsync(1, async (evt) =>
            {
                Console.WriteLine("\t\t" + evt);
                return Task.FromResult(evt);

            })
            .ReadAll(task =>
            {
            });

        channel.Writer.TryComplete();
        await task;
    }
}

1

There are 1 answers

2
Panagiotis Kanavos On

First, there's nothing wrong with using Channels for pipelines. That's what they were created for.

Error and exception handling in pipelines is harder than plain old calls though, whether you build them using Actors, Dataflow, Channels, IAsyncEnumerable, Observable and Reactive Extensions or function chains:

  • The code that receives the results and any possible exceptions is the next step in the pipeline
  • The only way such an exception can be handled is to nuke the pipeline. Unless you use an Actor model in which case a supervisor may decide to just restart that actor.
  • But since only downstream steps accept results and exceptions, there's no way to abort/nuke the upstream steps.

Especially for IAsyncEnumerable and Channels, since they use a pull model, downstream steps won't notice that something is wrong until they try to read from the previous step.

On top of that the Batch implementation doesn't seem to have explicit code to detect a failed source.

For this reason, exceptions should not escape from steps. A common pattern is to use a Result<TResult,TError> class to wrap the data passed among steps and either discard or forward error results. You'll often see this described as Railway Oriented Programming because error messages are moved to a "parallel" track that gets ignored by steps.

A lazy implementation could be:

record Result<T>(T? Message,Exception? Error)
{
    public bool HasError=>Error!=null;

    public Result<U> ToError()=>new Record<U>(null,Error);

    public static Result<T> Ok(T value)=>new Result<T>(value,null);

    public static Result<T> Fail(Exception exc)=>new Result<T>(null,exc);
}

...
channel.Reader.Pipe(1, (element) =>
            {
                try
                {
                    Console.WriteLine(element);
                    return Result.Ok(1);
                }
                catch(Exception exc)
                {
                    return Result<int>.Fail(exc);
                }
            })
            .Pipe(2, (msg) =>
            {
                if (msg.HasError)
                {
                    return msg.ToError<string>();
                }
                try
                {
                ...
                }
                ...
            })

Such code can easily be bundled in a generic method. The best place to put it would be inside the Pipe and Transform methods. Unfortunately, Open.ChannelExtensions doesn't follow this model.

Such a method could be :

public static Result<U> Call<T,U>(Result<T> msg,Func<T,U> func)
{
    if (msg.HasError)
    {
        return msg.ToError<U>();
    }
    try
    {
        return Result<U>.Ok(func(msg.Message));
    }
    catch(Exception exc)
    {
        return Result<U>.Fail(exc);
    }
}

Which can be used like this

channel.Reader.Pipe(1, (element) =>
{
    return Call(elt=>{
        Console.WriteLine(elt);
        return 1;
    },element);
).Pipe(2,...)

Call could also be an extension method on Func<T,U> allowing cleaner code:

public static Result<U> Call<T,U>(Result<T> msg,Func<T,U> func)
...
channel.Reader.Pipe(1, (element) =>
        myFunction.Call(element)
    )

To abort the entire pipeline, a CancellationTokenSource should be used to signal cancellation to all steps in the pipeline, both up- and down-stream. In case of critical exceptions, the exception handler inside the should signal the CTS to abort the entire pipeline.

It's worth noting that Pipe etc all accept a CancellationToken for this reason.

In less critical situations a step could simply stop producing results. This would allow downstream steps to process any remaining good data