I'm making use of the (frankly great) BlockingCollection<T>
type for a heavily multithreaded, high-performance app.
There's a lot of throughput through the collection and on the micro-level it's highly performant. However, for each 'batch' it will always be ended by flagging the cancellation token. This results in an exception being thrown on any waiting Take
call. That's fine, but I would have settled for a return value or output parameter to signal it, because a) exceptions have an obvious overhead and b) when debugging, I don't want to manually turn off break-on-exception for that specific exception.
The implementation seems intense, and in theory I suppose I could disassemble and recreate my own version that didn't use exceptions, but perhaps there's a less complex way?
I could add a null
(or if not, a placeholder) object to the collection to signify the process should end, however there also needs to be a means to abort nicely, i.e. wake up waiting threads and tell them somehow that something's gone on.
So - alternative collection types? Recreate my own? Some way to abuse this one?
(Some context: I went with BlockingCollection<T>
because it has an advantage over manual locking around a Queue
. As best I can tell the use of threading primitives is superb and in my case, a few milliseconds here-and-there and optimal core is use crucial.)
Edit: I've just opened a bounty for this one. I don't believe Anastasiosyal's answer covers the query I raise in my comment of it. I know this is a tough problem. Is anyone able to assist?
As I guess you have already done yourself, looking into the reflected source of BlockingCollection it looks unfortunately that when a CancellationToken is passed into the BlockingCollection and it cancels then you will get the OperationCancelledException as can be seen in the image below (with a couple of workarounds after the image)
GetConsumingEnumerable
invokesTryTakeWithNoTimeValidation
on the BlockingCollection which in turn raises this exception.Workaround #1
One potential strategy would be, assuming you have more control over your producers and your consumers, rather than pass the cancellation token into the BlockingCollection, (which will raise this exception) you pass the cancellation token into your producers and into your consumers.
If your producers aren't producing and your consumers aren't consuming, then you have effectively cancelled the operation without raising this exception and by passing CancellationToken.None in your BlockingCollection.
Special cases Cancelling when the BlockingCollection is at BoundedCapacity or Empty
Producers blocked: The producer threads will be blocked when BoundedCapacity on the BlockingCollection is reached. Hence, when attempting to cancel and the BlockingCollection is at BoundedCapacity (which means that your consumers are not blocked but producers are blocked because they cannot add any additional items to the queue) then you will need to allow for additional items to be consumed (one for each producer thread) that will unblock the producers (because they are blocked on adding to the blockingCollection) and in turn allow for your cancellation logic to kick in on the producer side.
Consumers blocked: When your consumers are blocked because the queue is empty, then you could insert an empty unit of work (one for each consumer thread) in the Blocking collection so as to unblock the consumer threads and allow for your cancellation logic to kick in the consumer side.
When there are items in the queue and no limit such as BoundedCapacity or Empty has been reached then the producers and consumer threads should not be blocked.
Workaround #2
Using a cancellation unit of work.
When your application needs to cancel, then your producers (maybe just 1 producer will suffice while the others just cancel producing) will produce a cancellation unit of work (could be null as you also mention or some class that implements a marker interface). When the consumers consume this unit of work and detect that it is in fact a cancellation unit of work, their cancellation logic kicks in. The number of cancellation units of work to be produced needs to equal the number of consumer threads.
Again, caution is needed when we are close to BoundedCapacity, as it could be a sign that some of the producers are blocked. Depending on the number of producers/consumers you could have a consumer consuming until all producers (but 1) have shut down. This ensures that there are no lingering producers around. When there is only 1 producer left, your last consumer can shut down and the producer can stop producing cancellation units of work.