Can a TPL Dataflow ActionBlock be Reset after a Fault?

1.5k views Asked by At

I have a TPL Dataflow Action Block that I'm using to receive trigger messages for a camera, then do some processing. If the processing task throws an exception, the ActionBlock enters the faulted state. I would like to send a faulted message to my UI and send a reset message to the ActionBlock so it can continue processing incoming trigger messages. Is there a way to return the ActionBlock to a ready state (clear the fault)?

Code for the curious:

using System.Threading.Tasks.Dataflow;

namespace Anonymous
{
    /// <summary>
    /// Provides a messaging system between objects that inherit from Actor
    /// </summary>
    public abstract class Actor
    {
        //The Actor uses an ActionBlock from the DataFlow library.  An ActionBlock has an input queue you can 
        // post messages to and an action that will be invoked for each received message.

        //The ActionBlock handles all of the threading issues internally so that we don't need to deal with 
        // threads or tasks. Thread-safety comes from the fact that ActionBlocks are serialized by default. 
        // If you send two messages to it at the same time it will buffer the second message until the first 
        // has been processed.
        private ActionBlock<Message> _action;

        ...Properties omitted for brevity...

        public Actor(string name, int id)
        {
            _name = name;
            _id = id;
            CreateActionBlock();
        }

        private void CreateActionBlock()
        {
            // We create an action that will convert the actor and the message to dynamic objects 
            // and then call the HandleMessage method.  This means that the runtime will look up 
            // a method called ‘HandleMessage’ with a parameter of the message type and call it.

            // in TPL Dataflow if an exception goes unhandled during the processing of a message, 
            // (HandleMessage) the exception will fault the block’s Completion task.

            //Dynamic objects expose members such as properties and methods at run time, instead 
            // of at compile time. This enables you to create objects to work with structures that 
            // do not match a static type or format. 
            _action = new ActionBlock<Message>(message =>
            {
                dynamic self = this;
                dynamic msg = message;
                self.HandleMessage(msg); //implement HandleMessage in the derived class
            }, new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 1  // This specifies a maximum degree of parallelism of 1.
                                            // This causes the dataflow block to process messages serially.
            });
        }

        /// <summary>
        /// Send a message to an internal ActionBlock for processing
        /// </summary>
        /// <param name="message"></param>
        public async void SendMessage(Message message)
        {
            if (message.Source == null)
                throw new Exception("Message source cannot be null.");
            try
            {
                _action.Post(message);
                await _action.Completion;
                message = null;
                //in TPL Dataflow if an exception goes unhandled during the processing of a message, 
                // the exception will fault the block’s Completion task.
            }
            catch(Exception ex)
            {
                _action.Completion.Dispose();
                //throw new Exception("ActionBlock for " + _name + " failed.", ex);
                Trace.WriteLine("ActionBlock for " + _name + " failed." + ExceptionExtensions.GetFullMessage(ex));

                if (_action.Completion.IsFaulted)
                {
                    _isFaulted = true;
                    _faultReason = _name + " ActionBlock encountered an exception while processing task: " + ex.ToString();
                    FaultMessage msg = new FaultMessage { Source = _name, FaultReason = _faultReason, IsFaulted = _isFaulted };
                    OnFaulted(msg);
                    CreateActionBlock();
                }
            }
        }

        public event EventHandler<FaultMessageEventArgs> Faulted;
        public void OnFaulted(FaultMessage message)
        {
            Faulted?.Invoke(this, new FaultMessageEventArgs { Message = message.Copy() });
            message = null;
        }

        /// <summary>
        /// Use to await the message processing result
        /// </summary>
        public Task Completion
        {
            get
            {
                _action.Complete();
                return _action.Completion;
            }
        }
    }
}
1

There are 1 answers

2
Panagiotis Kanavos On BEST ANSWER

An unhandled exception in an ActionBlock is like an unhandled exception in an application. Don't do this. Handle the exception appropriately.

In the simplest case, log it or do something inside the block's delegate. In more complex scenarios you can use a TransformBlock instead of an ActionBlock and send Succes or Failure messages to downstream blocks.

The code you posted though has some critical issues. Dataflow blocks aren't agents and agents aren't dataflow blocks. You can use the one to build the other of course, but they represent different paradigms. In this case your Actor emulates ActionBlock's own API with several bugs.

For example, you don't need to create a SendAsync, blocks already have one. You should not complete the block once you send a message. You won't be able to handle any other messages. Only call Complete() when you really don't want to use the ActionBlock any more. You don't need to set a DOP of 1, that's the default value.

You can set bounds to a DataflowBlock so that it accepts only eg 10 messages at a time. Otherwise all messages would be buffered until the block found the chance to process them

You could replace all of this code with the following :

void MyMethod(MyMessage message)
{
    try
    {
    //...
    }
    catch(Exception exc)
    {
        //ToString logs the *complete exception, no need for anything more
        _log.Error(exc.ToString());
    }
}

var blockOptions new ExecutionDataflowBlockOptions {
                                 BoundedCapacity=10,
                                 NameFormat="Block for MyMessage {0} {1}"
};
var block=new ActionBlock<MyMessage>(MyMethod,blockOptions);

for(int i=0;i<10000;i++)
{
    //Will await if more than 10 messages are waiting
    await block.SendAsync(new MyMessage(i);
}
block.Complete();
//Await until all leftover messages are processed
await block.Completion;

Notice the call to Exception.ToString(). This will generate a string containing all exception information, including the call stack.

NameFormat allows you to specify a name template for a block that can be filled by the runtime with the block's internal name and task ID.