BlockingCollection<T> throws unexpected InvalidOperationException

124 views Asked by At

I was trying out BlockingCollection<T> (as a Queue) in .NET8 and sometimes I end up with the exception:

"System.InvalidOperationException: The collection argument is empty and has been marked as complete with regards to additions."

Code example:

    private static void TestMethod()
    {
        using BlockingCollection<DummyClass> queue = new BlockingCollection<DummyClass>();

        var task = Task.Run(() =>  //Produce
        {
            for (int i = 0; i < 10000000; i++)
            {
                queue.Add(new DummyClass());
            }
            queue.CompleteAdding();

        });

        int counter = 0;
        try
        {
            while (!queue.IsCompleted)  //Consume
            {
                DummyClass item = queue.Take(); // <-- Sometimes exception here
                counter++;
            }

            Console.WriteLine($"counter={counter} ");
        } 
        catch (Exception ex)
        {
            Console.WriteLine("Error:" + ex.ToString());
        }
    }

IsCompleted states "Whether this collection has been marked as complete for adding and is empty."

So as long as CompleteAdding() has not been called Take() should be blocking and when CompleteAdding() has been called and the queue is empty, " !queue.IsCompleted " should return false.

What am I missing?

Any help would be very much appreciated.

Running in VS2022 17.8.5 on Windows 11.

2

There are 2 answers

12
Marc Gravell On BEST ANSWER

You have a thread race; imagine:

  • the queue is empty
  • thread A is about to call queue.CompleteAdding();
  • thread B is about to check while (!queue.IsCompleted)
  • thread B gets some cycles, finds it is not currently completed
  • thread A gets some cycles, marks the queue as complete
  • thread B gets some cycles, calls Take()

and boom

Solution: use TryTake

0
Theodor Zoulias On

The best way to consume a BlockingCollection<T> is the GetConsumingEnumerable method:

foreach (DummyClass item in queue.GetConsumingEnumerable())
{
    // Process the consumed item
}

Unfortunately this handy method is not featured prominently enough in Microsoft's documentation and examples. Personally I never had a need for either the IsCompleted or Take members of the collection. Here is how it is implemented internally:

public IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken)
{
    if (IsCompleted)
    {
        yield break;
    }

    using CancellationTokenSource linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _consumersCancellationTokenSource.Token);
    while (TryTakeWithNoTimeValidation(out T? item, Timeout.Infinite, cancellationToken, linkedTokenSource))
    {
        yield return item;
    }
}