await completion of BlockingCollection

502 views Asked by At

For a multithreaded application I want to await until a BlockingCollection is completed and empty (IsCompleted = true). I implemented the below and this seems to be working.

Since it's multithreading I don't even trust my own shadow. Would this be a robust implementation?

public class BlockingCollectionEx<T> : BlockingCollection<T>
{
    public Task WaitCompleted => completedManualResetEvent.Task;
    private readonly TaskCompletionSource completedManualResetEvent = new();

    public new void CompleteAdding()
    {
        base.CompleteAdding();

        lock (completedManualResetEvent)
        {
            if (base.Count == 0 && !completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }

    public new IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var item in base.GetConsumingEnumerable())
            yield return item;

        lock (completedManualResetEvent) //if GetConsumingEnumerable is used by multiple threads, the 2nd one would throw an InvalidOperationException 
        {
            if (!completedManualResetEvent.Task.IsCompleted)
                completedManualResetEvent.SetResult();
        }
    }
    public new IEnumerable<T> GetConsumingEnumerable(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new T Take() => throw new NotImplementedException();
    public new T Take(CancellationToken cancellationToken) => throw new NotImplementedException();

    public new bool TryTake([MaybeNullWhen(false)] out T item) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, int millisecondsTimeout, CancellationToken cancellationToken) => throw new NotImplementedException();
    public new bool TryTake([MaybeNullWhen(false)] out T item, TimeSpan timeout) => throw new NotImplementedException();
}

usage:

var x = new BlockingCollectionEx<int> { 1, 2, 3 };
x.CompleteAdding();

Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 1
});
Task.Run(() =>
{
    foreach (var item in x.GetConsumingEnumerable())
        // do stuff in Task 2
});

await x.WaitCompleted;
Debug.Assert(x.IsCompleted);
// do stuff since the collection is emtpy
1

There are 1 answers

4
Theodor Zoulias On

Your implementation is not robust for general usage, but it may be good enough for an application that honors the following contract:

The collection must be consumed by exactly one consumer, who consumes it using exclusively the GetConsumingEnumerable method.

  1. If there is no consumer, the collection is empty, and the CompleteAdding method is invoked, the WaitCompleted task will never complete.
  2. If there are two or more consumers, the enumeration will fail with an InvalidOperationException for all but one consumers.
  3. If there is one consumer, but consumes the collection by using the Take or TryTake methods, the WaitCompleted task will never complete.

Without knowing your specific use case, I couldn't say whether you have a legitimate reason for requesting this functionality. In general though, waiting for the exact moment that a BlockingCollection<T> becomes empty and completed is usually unimportant. What is important is the exact moment that the processing of all consumed items is completed, which happens after the completion of the collection.


Note: this answer targets the Revision 1 version of this question.