Implementation of a Bounded Blocking Queue Using C++

3.2k views Asked by At

I'm not so familiar with the blocking queue with thread safety, so I just Googled and found the following references:

  1. C++ pthread blocking queue deadlock (I think)

  2. http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

So I just copied and created my Bounded Blocking Queue using C++. Are there any problems with my implementation?

// Assume all includes are there
struct BoundedBlockingQueueTerminateException
    : virtual std::exception,
      virtual boost::exception
{

};

template<typename T>
class BoundedBlockingQueue
{
private:
    std::queue<T>                        q;
    boost::mutex                         mtx;
    boost::mutex::condition_variable_any cond1; // q.empty() condition
    boost::mutex::condition_variable_any cond2; // q.size() == size condition
    int                                  nblocked1;
    int                                  nblocked2;
    bool                                 stopped;
    int                                  size;

public:
    BoundedBlockingQueue(int size)
        : size(size), nblocked1(0), nblocked2(0), stopped(false)
    {
        if (size < 1)
        {
            // BOOST_THROW_EXCEPTION
        }
    }

    ~BoundedBlockingQueue()
    {
        Stop(true);
    }

    bool Empty()
    {
        boost::mutex::scoped_lock lock(mtx);
        return q.empty();
    }

    std::size_t Size()
    {
        boost::mutex::scoped_lock lock(mtx);
        return q.size();
    }

    bool TryPush(const T& item)
    {
        boost::mutex::scoped_lock lock(mtx);
        if (q.size() == size)
            return false;

        q.push(item);
        lock.unlock();
        cond1.notify_one();
        return true;
    }

    void WaitPush(const T& item)
    {
        boost::mutex::scoped_lock lock(mtx);

        ++nblocked2;
        while (!stopped && q.size() == size)
            cond2.wait(lock);
        --nblocked2;

        if (stopped)
        {
            cond2.notify_all();
            BOOST_THROW_EXCEPTION(BoundedBlockingQueueTerminateException());
        }

        q.push(item);
        lock.unlock(mtx);
        cond1.notify_one();
    }

    bool TryPop(T& popped)
    {
        boost::mutex::scoped_lock lock(mtx);
        if (q.empty())
            return false;

        popped = q.front();
        q.pop();
        lock.unlock(mtx);
        cond2.notify_one();
        return true;
    }

    void WaitPop(T& popped)
    {
        boost::mutex::scoped_lock lock(mtx);

        ++nblocked1;
        while (!stopped && q.empty())
            cond1.wait(lock);
        --nblocked1;

        if (stopped)
        {
            cond1.notify_all();
            BOOST_THROW_EXCEPTION(BoundedBlockingQueueTerminateException());
        }

        popped = q.front();
        q.pop();
        cond2.notify_one();
    }

    void Stop(bool wait)
    {
        boost::mutex::scoped_lock lock(mtx);
        stopped = true;
        cond1.notify_all();
        cond2.notify_all();

        if (wait)
        {
            while (nblocked1)
                cond1.wait(lock);
            while (nblcoked2)
                cond2.wait(lock);
        }
    }
};
0

There are 0 answers