Does condition variable notify_one keep trying until it reaches thread awaiting with a positive predicate?

211 views Asked by At

I'm testing edge cases with std::condition_variable and I tested scenario to starve one thread. Scenario is that there are 99 producers and only one consumer, all of them working on 1 queue with max size. The probability that notify_one (after reaching max size of the queue) will hit the consumer is close to 1%. So it means that it will hit another producer, which will check the predicate and then wait for the mutex.

I expected program to hang at this point, but what I saw is that program tries as many predicates, until finally it hits the consumer. If there is no consumer, then predicates are checked across all waiting threads forever.

My question is: Is it defined by the standard, that notify_one() will try to notify first waiting thread with positive predicate result? Then why does it work with consumer without prediction (commented code). Or is it a spurious wake up that is trying so hard to wake up my waiting threads? Or something else?

Tested on windows with Clang and MSVC.

My code to reproduce the case:

#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <iostream>

class CVSimple
{

public:
    static void test() {
        std::queue<int> que;
        std::mutex m;
        std::condition_variable cv;
        int max_size = 10;
        bool working = true;


        auto producer = [&]() {
            while (working) {
                std::unique_lock<std::mutex> lock(m);
                std::chrono::milliseconds t(200);
                auto predicate = [&que, &max_size]() { 
                    if (que.size() < max_size) {
                        std::cout << "T";
                        return true;
                    } 
                    std::cout << "F";
                    return false;
                };
                if (cv.wait_for(lock, t, predicate)) {
                    std::cout << "+";
                    std::this_thread::sleep_for(std::chrono::milliseconds(50));
                    que.push(1);
                    lock.unlock();
                    std::cout << "N";
                    cv.notify_one();
                }
                else {
                    //std::cout << "P";
                }

            }
        };

        auto consumer = [&]() {
            while (working) {
                std::unique_lock<std::mutex> lock(m);
                std::chrono::milliseconds t(200);
                auto predicate = [&que]() {
                    if (!que.empty()) {
                        std::cout << "t";
                        return true;
                    }
                    else {
                        std::cout << "f";
                        return false;
                    };
                };
                //cv.wait(lock, predicate);
                //std::cout << "-";
                //std::this_thread::sleep_for(std::chrono::milliseconds(50));
                //que.pop();
                //lock.unlock();
                //std::cout << "n";
                //cv.notify_one();


                if (cv.wait_for(lock, t, predicate)) {
                    std::cout << "-";
                    std::this_thread::sleep_for(std::chrono::milliseconds(50));
                    que.pop();
                    lock.unlock();
                    std::cout << "n";
                    cv.notify_one();
                }
                else {
                    std::cout << "o";
                }
            }
        };

        int nprod = 100;
        int ncons = 1;

        std::cout << "Start producers" << std::endl;
        std::vector<std::thread> threads;
        for (int i = 0; i < nprod; ++i) {
            threads.emplace_back(producer);
        }

        std::cout << "Start consumers" << std::endl;
        for (int i = 0; i < ncons; ++i) {
            threads.emplace_back(consumer);
        }

        std::this_thread::sleep_for(std::chrono::seconds(20));
        std::cout << "Stop working" << std::endl;
        working = false;
        for (auto& th : threads) {
            if (th.joinable()) {
                th.join();
            }
        }
        return;
    }
};
#include "CVSimple.h"

int main()
{
    CVSimple::test();
}

output:

Start producers
T+Start consumers
NT+NT+NT+NT+NT+NT+NT+NT+NT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFt-nT+NFFFFFFFFFFt-nT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF

So when queue is full, after last element is added and notification is send, first producer fails his predicate (first F in output). Then I expect it to wait for another notification (N), which does not happen, but a lot of checks are happening. Which means that sb triggers it, is it spurious wakeup?

I've tried code provided in the question, it worked but I'm not sure why.

1

There are 1 answers

0
Solomon Slow On

The "FFFFFFFFFFF" happens after the queue becomes full. Here's what I think:

Your producer function calls cv.wait_for(lock, t, predicate), where t is a timeout value. Once the queue is full, all of your producer threads wait with timeout for a notification that never comes. Each time the wait_for call times out, the producer does nothing† except go back to the top of the loop and call wait_for again, and each call to wait_for will test the given predicate, one time, printing an "F" each time because the queue still is full.

With 99 producer threads, and a 200ms timeout, I reckon you should see about 495 "F"s per second, peppered with an occasional "t-nT+N" each time the consumer thread gets a lucky notification.

I added a main routine, and I ran the program:

int main(int argc, char* argv[]) {
    CVSimple cvs;
    cvs.test();
}

What I actually saw was long runs of "FFFFFFF" punctuated by occasional runs of "t-nt-nt-n..." (usually nine or ten repeats) followed by an equal number of "T+N", and then back to "FFFFFFF"


But Wait! (I hear you cry.) Why aren't there five "o"s being printed per second when the consumer's wait_for(...) call times out?

I don't know, but maybe the consumer is being starved for the lock. Remember that, even after the timeout, wait_for(lock,...) cannot return until it re-acquires the lock, and there are 99 other threads all fighting each other for ownership of it.

The fact that usually, when the consumer does get to run, it almost always drains the entire queue before any producer is allowed to run again is a strong hint that you've got starvation issues.


† It would do something if you'd uncomment the std::cout << "P" line.