I am using a tbb::concurrent_bounded_queue to communicate in a buffered fashion between N producer/consumer threads.
Like so:
//which has been set to some max capacity
tbb::concurrent_bounded_queue<MyItemType> g_queue;
ThreadA()
{
while(running_A)
{
MyItemType outItem;
//do some stuff, resulting in filling outItem ...
g_queue.push(outItem);
}
}
ThreadB()
{
while(running_B)
{
MyItemType inItem;
g_queue.pop(inItem);
//do some stuff with the contents of inItem
}
}
The example I just showed only has one producer and one consumer, in reality there can many of both. At a user's request all processing should cease as soon as possible.
So if thread A has pushed a few items, that B had not consumed, I want the next call to B's pop to immediately return somehow indicating that the queue is closed so that I can break from the loop. Any items on the queue not serviced yet can be discarded.
If thread A is waiting on a push, I want that to return immediately, perhaps indicating that the queue is closed for business.
I read some where a way of potentially doing this is to push a special item (stop item, indicator of cease and desist) on the queue, then when that item is popped i can know things are stopping. The issue with this is that it's a FIFO queue, and any items ahead of the stop item need to be popped before it can get to my stop item. Which i want to avoid.
I see there is an q.abort method, that would cause waiting push or pops to return throwing an exception (works only if I set a tbb preprocessor define TBB_USE_EXCEPTIONS). Didn't think having to turn on exceptions for all of tbb to be a proper way of doing this?
I also thought I could clear the queue before adding my stop item, but theoretically another producer can come in between my current consumers q.clear() and q.push(stopItem), and push a non-stopItem, which is not what I want. Since i would have to wait for that non-stopItem to be processed before it can get to the pushed stopItem.
What is the proper way of handling this? I previously had my own queue implementation that had a close signal that would cause all pops and pushes to return (not just fill in the reference item parameter, but return a code from the function all) with a certain value code that would indicate that the queue was closed. Not sure how to have a similar capability with tbb's concurrent bounded queue.
Any suggestions?
Thanks, -Ryan
If I understand you correctly, you ask how to terminate producer&consumer at any random moment without processing remaining elements. If so, you need a separate notification mechanism, e.g. a flag:
Then, you can read the flag in the loops in order to terminate them if not blocked:
If a thread is blocked,
abort()
looks like a fine way unless it needs to be done fast and repeatedly.TBB_USE_EXCEPTIONS
is turned on by default in TBB binaries, so, you don't actually pay for it in TBB scheduler and conurrent_bounded_queue internals. AndTBB_USE_EXCEPTIONS
is turned on by default in TBB headers if application is compiled with enabled exceptions (often true). So you likely to pay only for try/catch block around push&pop.There are alternatives to the exception. The most straightforward one is to unblock the threads by popping some items to unblock the producers and pushing a dummy item to unblock the consumers. E.g.: