I'm testing edge cases with std::condition_variable
and I tested scenario to starve one thread.
Scenario is that there are 99 producers and only one consumer, all of them working on 1 queue with max size.
The probability that notify_one (after reaching max size of the queue) will hit the consumer is close to 1%.
So it means that it will hit another producer, which will check the predicate and then wait for the mutex.
I expected program to hang at this point, but what I saw is that program tries as many predicates, until finally it hits the consumer. If there is no consumer, then predicates are checked across all waiting threads forever.
My question is: Is it defined by the standard, that notify_one() will try to notify first waiting thread with positive predicate result? Then why does it work with consumer without prediction (commented code). Or is it a spurious wake up that is trying so hard to wake up my waiting threads? Or something else?
Tested on windows with Clang and MSVC.
My code to reproduce the case:
#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <iostream>
class CVSimple
{
public:
static void test() {
std::queue<int> que;
std::mutex m;
std::condition_variable cv;
int max_size = 10;
bool working = true;
auto producer = [&]() {
while (working) {
std::unique_lock<std::mutex> lock(m);
std::chrono::milliseconds t(200);
auto predicate = [&que, &max_size]() {
if (que.size() < max_size) {
std::cout << "T";
return true;
}
std::cout << "F";
return false;
};
if (cv.wait_for(lock, t, predicate)) {
std::cout << "+";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
que.push(1);
lock.unlock();
std::cout << "N";
cv.notify_one();
}
else {
//std::cout << "P";
}
}
};
auto consumer = [&]() {
while (working) {
std::unique_lock<std::mutex> lock(m);
std::chrono::milliseconds t(200);
auto predicate = [&que]() {
if (!que.empty()) {
std::cout << "t";
return true;
}
else {
std::cout << "f";
return false;
};
};
//cv.wait(lock, predicate);
//std::cout << "-";
//std::this_thread::sleep_for(std::chrono::milliseconds(50));
//que.pop();
//lock.unlock();
//std::cout << "n";
//cv.notify_one();
if (cv.wait_for(lock, t, predicate)) {
std::cout << "-";
std::this_thread::sleep_for(std::chrono::milliseconds(50));
que.pop();
lock.unlock();
std::cout << "n";
cv.notify_one();
}
else {
std::cout << "o";
}
}
};
int nprod = 100;
int ncons = 1;
std::cout << "Start producers" << std::endl;
std::vector<std::thread> threads;
for (int i = 0; i < nprod; ++i) {
threads.emplace_back(producer);
}
std::cout << "Start consumers" << std::endl;
for (int i = 0; i < ncons; ++i) {
threads.emplace_back(consumer);
}
std::this_thread::sleep_for(std::chrono::seconds(20));
std::cout << "Stop working" << std::endl;
working = false;
for (auto& th : threads) {
if (th.joinable()) {
th.join();
}
}
return;
}
};
#include "CVSimple.h"
int main()
{
CVSimple::test();
}
output:
Start producers
T+Start consumers
NT+NT+NT+NT+NT+NT+NT+NT+NT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFt-nT+NFFFFFFFFFFt-nT+NFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
So when queue is full, after last element is added and notification is send, first producer fails his predicate (first F in output). Then I expect it to wait for another notification (N), which does not happen, but a lot of checks are happening. Which means that sb triggers it, is it spurious wakeup?
I've tried code provided in the question, it worked but I'm not sure why.
The "FFFFFFFFFFF" happens after the queue becomes full. Here's what I think:
Your
producer
function callscv.wait_for(lock, t, predicate)
, wheret
is a timeout value. Once the queue is full, all of your producer threads wait with timeout for a notification that never comes. Each time thewait_for
call times out, the producer does nothing†except go back to the top of the loop and callwait_for
again, and each call towait_for
will test the givenpredicate
, one time, printing an "F" each time because the queue still is full.With 99 producer threads, and a 200ms timeout, I reckon you should see about 495 "F"s per second, peppered with an occasional "t-nT+N" each time the consumer thread gets a lucky notification.I added a
main
routine, and I ran the program:What I actually saw was long runs of "FFFFFFF" punctuated by occasional runs of "t-nt-nt-n..." (usually nine or ten repeats) followed by an equal number of "T+N", and then back to "FFFFFFF"
But Wait! (I hear you cry.) Why aren't there five "o"s being printed per second when the consumer's
wait_for(...)
call times out?I don't know, but maybe the consumer is being starved for the
lock
. Remember that, even after the timeout,wait_for(lock,...)
cannot return until it re-acquires thelock
, and there are 99 other threads all fighting each other for ownership of it.The fact that usually, when the consumer does get to run, it almost always drains the entire queue before any producer is allowed to run again is a strong hint that you've got starvation issues.
†It would do something if you'd uncomment the
std::cout << "P"
line.