Proper way to cancel all waiting pushes or pops on tbb::concurrent_bounded_queue?

1k views Asked by At

I am using a tbb::concurrent_bounded_queue to communicate in a buffered fashion between N producer/consumer threads.

Like so:

//which has been set to some max capacity
tbb::concurrent_bounded_queue<MyItemType> g_queue; 

ThreadA()
{ 
  while(running_A)
  { 
    MyItemType outItem;
    //do some stuff, resulting in filling outItem ...
    g_queue.push(outItem);
  }
}

ThreadB()
{
  while(running_B)
  { 
    MyItemType inItem;
    g_queue.pop(inItem);
    //do some stuff with the contents of inItem
  }
}

The example I just showed only has one producer and one consumer, in reality there can many of both. At a user's request all processing should cease as soon as possible.

So if thread A has pushed a few items, that B had not consumed, I want the next call to B's pop to immediately return somehow indicating that the queue is closed so that I can break from the loop. Any items on the queue not serviced yet can be discarded.

If thread A is waiting on a push, I want that to return immediately, perhaps indicating that the queue is closed for business.

I read some where a way of potentially doing this is to push a special item (stop item, indicator of cease and desist) on the queue, then when that item is popped i can know things are stopping. The issue with this is that it's a FIFO queue, and any items ahead of the stop item need to be popped before it can get to my stop item. Which i want to avoid.

I see there is an q.abort method, that would cause waiting push or pops to return throwing an exception (works only if I set a tbb preprocessor define TBB_USE_EXCEPTIONS). Didn't think having to turn on exceptions for all of tbb to be a proper way of doing this?

I also thought I could clear the queue before adding my stop item, but theoretically another producer can come in between my current consumers q.clear() and q.push(stopItem), and push a non-stopItem, which is not what I want. Since i would have to wait for that non-stopItem to be processed before it can get to the pushed stopItem.

What is the proper way of handling this? I previously had my own queue implementation that had a close signal that would cause all pops and pushes to return (not just fill in the reference item parameter, but return a code from the function all) with a certain value code that would indicate that the queue was closed. Not sure how to have a similar capability with tbb's concurrent bounded queue.

Any suggestions?

Thanks, -Ryan

1

There are 1 answers

4
Anton On

If I understand you correctly, you ask how to terminate producer&consumer at any random moment without processing remaining elements. If so, you need a separate notification mechanism, e.g. a flag:

tbb::atomic<bool> is_finished = false;

Then, you can read the flag in the loops in order to terminate them if not blocked:

ThreadA()
{ 
  while(running_A && !is_finished)
  { 
    MyItemType outItem;
    //do some stuff, resulting in filling outItem ...
    try { g_queue.push(outItem); } catch(...) {}
  }
}

ThreadB()
{
  while(running_B && !is_finished)
  { 
    MyItemType inItem;
    try { g_queue.pop(inItem); } catch(...) {}
    if(is_finished) // check before processing
        break;
    //do some stuff with the contents of inItem
  }
}

Terminate()
{
    is_finished = true;
    g_queue.abort();
}

If a thread is blocked, abort() looks like a fine way unless it needs to be done fast and repeatedly. TBB_USE_EXCEPTIONS is turned on by default in TBB binaries, so, you don't actually pay for it in TBB scheduler and conurrent_bounded_queue internals. And TBB_USE_EXCEPTIONS is turned on by default in TBB headers if application is compiled with enabled exceptions (often true). So you likely to pay only for try/catch block around push&pop.

There are alternatives to the exception. The most straightforward one is to unblock the threads by popping some items to unblock the producers and pushing a dummy item to unblock the consumers. E.g.:

Terminate()
{
    is_finished = true;
    MyItemType dummyItem;
    while(g_queue.size() < 0) // consumers are blocked
        g_queue.try_push(dummyItem);
    while(g_queue.size() >= g_queue.capacity() ) // producers are blocked
        g_queue.try_pop(dummyItem);
}