Using BlockingCollection<T>: OperationCanceledException, is there a better way?

6.8k views Asked by At

I'm making use of the (frankly great) BlockingCollection<T> type for a heavily multithreaded, high-performance app.

There's a lot of throughput through the collection and on the micro-level it's highly performant. However, for each 'batch' it will always be ended by flagging the cancellation token. This results in an exception being thrown on any waiting Take call. That's fine, but I would have settled for a return value or output parameter to signal it, because a) exceptions have an obvious overhead and b) when debugging, I don't want to manually turn off break-on-exception for that specific exception.

The implementation seems intense, and in theory I suppose I could disassemble and recreate my own version that didn't use exceptions, but perhaps there's a less complex way?

I could add a null (or if not, a placeholder) object to the collection to signify the process should end, however there also needs to be a means to abort nicely, i.e. wake up waiting threads and tell them somehow that something's gone on.

So - alternative collection types? Recreate my own? Some way to abuse this one?

(Some context: I went with BlockingCollection<T> because it has an advantage over manual locking around a Queue. As best I can tell the use of threading primitives is superb and in my case, a few milliseconds here-and-there and optimal core is use crucial.)

Edit: I've just opened a bounty for this one. I don't believe Anastasiosyal's answer covers the query I raise in my comment of it. I know this is a tough problem. Is anyone able to assist?

5

There are 5 answers

3
Anastasiosyal On

As I guess you have already done yourself, looking into the reflected source of BlockingCollection it looks unfortunately that when a CancellationToken is passed into the BlockingCollection and it cancels then you will get the OperationCancelledException as can be seen in the image below (with a couple of workarounds after the image)

GetConsumingEnumerable invokes TryTakeWithNoTimeValidation on the BlockingCollection which in turn raises this exception.

enter image description here

Workaround #1

One potential strategy would be, assuming you have more control over your producers and your consumers, rather than pass the cancellation token into the BlockingCollection, (which will raise this exception) you pass the cancellation token into your producers and into your consumers.

If your producers aren't producing and your consumers aren't consuming, then you have effectively cancelled the operation without raising this exception and by passing CancellationToken.None in your BlockingCollection.

Special cases Cancelling when the BlockingCollection is at BoundedCapacity or Empty

Producers blocked: The producer threads will be blocked when BoundedCapacity on the BlockingCollection is reached. Hence, when attempting to cancel and the BlockingCollection is at BoundedCapacity (which means that your consumers are not blocked but producers are blocked because they cannot add any additional items to the queue) then you will need to allow for additional items to be consumed (one for each producer thread) that will unblock the producers (because they are blocked on adding to the blockingCollection) and in turn allow for your cancellation logic to kick in on the producer side.

Consumers blocked: When your consumers are blocked because the queue is empty, then you could insert an empty unit of work (one for each consumer thread) in the Blocking collection so as to unblock the consumer threads and allow for your cancellation logic to kick in the consumer side.

When there are items in the queue and no limit such as BoundedCapacity or Empty has been reached then the producers and consumer threads should not be blocked.

Workaround #2

Using a cancellation unit of work.

When your application needs to cancel, then your producers (maybe just 1 producer will suffice while the others just cancel producing) will produce a cancellation unit of work (could be null as you also mention or some class that implements a marker interface). When the consumers consume this unit of work and detect that it is in fact a cancellation unit of work, their cancellation logic kicks in. The number of cancellation units of work to be produced needs to equal the number of consumer threads.

Again, caution is needed when we are close to BoundedCapacity, as it could be a sign that some of the producers are blocked. Depending on the number of producers/consumers you could have a consumer consuming until all producers (but 1) have shut down. This ensures that there are no lingering producers around. When there is only 1 producer left, your last consumer can shut down and the producer can stop producing cancellation units of work.

1
Theodor Zoulias On

My suggestion is to implement this functionality by encapsulating an asynchronous queue, like the BufferBlock<T> class from the TPL Dataflow library. This class is a thread-safe container intended for producer-consumer scenarios, and supports backpressure (BoundedCapacity) just like the BlockingCollection<T> class. Being asynchronous means that the corresponding Add/Take methods (SendAsync/ReceiveAsync) return tasks. These tasks store the event of a cancellation as an internal state, that can be queried with the IsCanceled property, so throwing exceptions internally can be avoided. Propagating this state with exceptions can also be avoided, by waiting the tasks using a exception-suppressing continuation (ContinueWith). Here is an implementation:

/// <summary>
/// A thread-safe collection that provides blocking and bounding capabilities.
/// The cancellation is propagated as a false result, and not as an exception.
/// </summary>
public class CancellationFriendlyBlockingCollection<T>
{
    private readonly BufferBlock<T> _bufferBlock;

    public CancellationFriendlyBlockingCollection()
    {
        _bufferBlock = new BufferBlock<T>();
    }

    public CancellationFriendlyBlockingCollection(int boundedCapacity)
    {
        _bufferBlock = new BufferBlock<T>(new() { BoundedCapacity = boundedCapacity });
    }

    public bool TryAdd(T item, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested) return false;
        if (_bufferBlock.Post(item)) return true;
        Task<bool> task = _bufferBlock.SendAsync(item, cancellationToken);
        WaitNoThrow(task);
        if (!task.IsCompletedSuccessfully) return false;
        return task.Result;
    }

    public bool TryTake(out T item, CancellationToken cancellationToken = default)
    {
        if (cancellationToken.IsCancellationRequested) { item = default; return false; }
        if (_bufferBlock.TryReceive(out item)) return true;
        Task<T> task = _bufferBlock.ReceiveAsync(cancellationToken);
        WaitNoThrow(task);
        if (!task.IsCompletedSuccessfully) return false;
        item = task.Result; return true;
    }

    public IEnumerable<T> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        while (TryTake(out var item, cancellationToken)) yield return item;
    }

    public void CompleteAdding() => _bufferBlock.Complete();
    public bool IsCompleted => _bufferBlock.Completion.IsCompleted;
    public int Count => _bufferBlock.Count;

    // Wait the task to complete without throwing exceptions
    private static void WaitNoThrow(Task task)
    {
        if (task.IsCompleted) return;
        task.ContinueWith(_ => { }, default,
            TaskContinuationOptions.ExecuteSynchronously |
            TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default).Wait();
        Debug.Assert(task.IsCompleted);
    }
}

Performance: The CancellationFriendlyBlockingCollection.TryTake method can be invoked with a canceled CancellationToken in a loop with a frequency of about 15,000,000 times per second in my PC (on a single thread). For comparison the frequency of the BlockingCollection<T>.Take under the same conditions is about 20,000 times per second.

You might be tempted to replace the BufferBlock<T> with a more modern asynchronous queue like the Channel<T>. In that case please make sure to read this question first, in order to be aware about a leaky behavior of this class, under specific conditions.

6
Alois Kraus On

How about the BlockingQueue I did a while ago?

http://apichange.codeplex.com/SourceControl/changeset/view/76c98b8c7311#ApiChange.Api%2fsrc%2fInfrastructure%2fBlockingQueue.cs

It should do fine without any exceptions. The current queue does simply close the event on dispose which might not be what you want. You might want do enque a null and wait until all items were processed. Apart from this it should suit your needs.

using System.Collections.Generic;
using System.Collections;
using System.Threading;
using System;

namespace ApiChange.Infrastructure
{

    /// <summary>
    /// A blocking queue which supports end markers to signal that no more work is left by inserting
    /// a null reference. This constrains the queue to reference types only. 
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class BlockingQueue<T> : IEnumerable<T>, IEnumerable, IDisposable where T : class
    {
        /// <summary>
        /// The queue used to store the elements
        /// </summary>
        private Queue<T> myQueue = new Queue<T>();
        bool myAllItemsProcessed = false;
        ManualResetEvent myEmptyEvent = new ManualResetEvent(false);

        /// <summary>
        /// Deques an element from the queue and returns it.
        /// If the queue is empty the thread will block. If the queue is stopped it will immedieately
        /// return with null.
        /// </summary>
        /// <returns>An object of type T</returns>      
        public T Dequeue()
        {
            if (myAllItemsProcessed)
                return null;

            lock (myQueue)
            {
                while (myQueue.Count == 0) 
                {
                    if(!Monitor.Wait(myQueue, 45))
                    {
                        // dispatch any work which is not done yet
                        if( myQueue.Count > 0 )
                            continue;
                    }

                    // finito 
                    if (myAllItemsProcessed)
                    {
                        return null;
                    }
                }

                T result = myQueue.Dequeue();
                if (result == null)
                {
                    myAllItemsProcessed = true;
                    myEmptyEvent.Set();
                }
                return result;
            }
        }

        /// <summary>
        /// Releases the waiters by enqueuing a null reference which causes all waiters to be released. 
        /// The will then get a null reference as queued element to signal that they should terminate.
        /// </summary>
        public void ReleaseWaiters()
        {
            Enqueue(null);
        }

        /// <summary>
        /// Waits the until empty. This does not mean that all items are already process. Only that
        /// the queue contains no more pending work. 
        /// </summary>
        public void WaitUntilEmpty()
        {
            myEmptyEvent.WaitOne();
        }

        /// <summary>
        /// Adds an element of type T to the queue. 
        /// The consumer thread is notified (if waiting)
        /// </summary>
        /// <param name="data_in">An object of type T</param>      
        public void Enqueue(T data_in)
        {
            lock (myQueue)
            {
                myQueue.Enqueue(data_in);
                Monitor.PulseAll(myQueue);
            }
        }

        /// <summary>
        /// Returns an IEnumerator of Type T for this queue
        /// </summary>
        /// <returns></returns>    
        IEnumerator<T> IEnumerable<T>.GetEnumerator()
        {
            while (true)
            {
                T item = Dequeue();
                if (item == null)
                    break;
                else
                    yield return item;
            }
        }

        /// <summary>
        /// Returns a untyped IEnumerator for this queue
        /// </summary>
        /// <returns></returns>     
        IEnumerator IEnumerable.GetEnumerator()
        {
            return ((IEnumerable<T>)this).GetEnumerator();
        }


        #region IDisposable Members

        /// <summary>
        /// Closes the EmptyEvent WaitHandle.
        /// </summary>
        public void Dispose()
        {
            myEmptyEvent.Close();
        }

        #endregion
    }
}
7
s_nair On

Kieren,

From my inspection, I personally don't know any thread safe type for ProducerConsumer pattern which does exactly what you wanted. I don't claim this as competitive solution but propose you decorate BlockingCollection<T> with few extension method which will give you the freedom to supply any built-in or custom types instead of default CancellationToken.

Stage 1:

Following are the list of default method which use underling TryAddWithNoTimeValidation method to add to queue.

public void Add(T item){
      this.TryAddWithNoTimeValidation(item, -1, new CancellationToken());
}

public void Add(T item, CancellationToken cancellationToken){
      this.TryAddWithNoTimeValidation(item, -1, cancellationToken);
    }

public bool TryAdd(T item){
      return this.TryAddWithNoTimeValidation(item, 0, new CancellationToken());
    }

public bool TryAdd(T item, TimeSpan timeout){
      BlockingCollection<T>.ValidateTimeout(timeout);
      return this.TryAddWithNoTimeValidation(item, (int) timeout.TotalMilliseconds, new CancellationToken());
    }

public bool TryAdd(T item, int millisecondsTimeout){
      BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
      return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, new           CancellationToken());
}

public bool TryAdd(T item, int millisecondsTimeout, CancellationToken cancellationToken){
 BlockingCollection<T>.ValidateMillisecondsTimeout(millisecondsTimeout);
 return this.TryAddWithNoTimeValidation(item, millisecondsTimeout, cancellationToken);
}

Now you can provide extension for any/all of method which you are interested.

Stage 2:

You now refer your implementation of TryAddWithNoTimeValidation instead of default.

I can give you an alternate version of TryAddWithNoTimeValidation which safely continue without throwing OperationCancellation exception.

5
IvoTops On

You cound signal the end of a batch by setting a flag on the last item (add a IsLastItem bool property to it or wrap it). Or you might send a null as last item (not sure if a null goes through the blockingcollection correctly though).

If you can remove the need for the 'batch' concept you can create an extra thread to continously Take() and Process new Data from your blockingcollection and do nothing else.