C++ - Multi-threading - Communication between threads

19k views Asked by At
#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>

using namespace std;

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone){
    while(!workdone) {
        unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one();
     }
}

void work(queue<int> & numbers, condition_variable & cv, mutex & m, bool & workdone) {
    while(!workdone) {
        unique_lock<std::mutex> lk(m);
        cv.wait(lk);
        cout << numbers.front() << endl;
        numbers.pop();
        consumer_count++;

     }
}

int main() {
    condition_variable cv;
    mutex m;
    bool workdone = false;
    queue<int> numbers;

    //start threads
    thread producer(generateNumbers, ref(numbers), ref(cv), ref(m),     ref(workdone));
    thread consumer(work, ref(numbers), ref(cv), ref(m), ref(workdone));

    //wait for 3 seconds, then join the threads
    this_thread::sleep_for(std::chrono::seconds(3));
    workdone = true;

    producer.join();
    consumer.join();

    //output the counters
    cout << producer_count << endl;
    cout << consumer_count << endl;

    return 0;
}

Hello Everyone, I tried to implement the Producer-Consumer-Pattern with C++. The producer thread generates random integers, adds them to a queue and then notifies the consumer thread that a new number was added.

The consumer thread waits for the notification and then prints the first element of the queue to the console and deletes it.

I incremented a counter for every number that is added to the queue and another counter for every number that is taken out of the queue.

I expected the two counters to hold the same value after the program is finished, however the difference is huge. The counter that represents the addition to the queue is always in the million range (3871876 in my last test) and the counter that represents the consumer which takes numbers out of the queue is always below 100k (89993 in my last test).

Can someone explain to me why there is such a huge difference? Do I have to add another condition variable so that the producer threads waits for the consumer thread as well? Thanks!

2

There are 2 answers

4
Jonas On BEST ANSWER

No need for a second std::condition_variable, just reuse the one you have. As mentioned by other you should consider using std::atomic<bool> instead of plain bool. But I must admit that g++ with -O3 does not optimize it away.

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one(); // Notify worker
        cv.wait(lk); // Wait for worker to complete
     }
}

void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
        cv.wait(lk); // Wait for the generator to complete
        std::cout << numbers.front() << std::endl;
        numbers.pop();
        consumer_count++;
     }
}

int main() {
    std::condition_variable cv;
    std::mutex m;
    std::atomic<bool> workdone(false);
    std::queue<int> numbers;

    //start threads
    std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));


    //wait for 3 seconds, then join the threads
    std::this_thread::sleep_for(std::chrono::seconds(3));
    workdone = true;
    cv.notify_all(); // To prevent dead-lock

    producer.join();
    consumer.join();

    //output the counters
    std::cout << producer_count << std::endl;
    std::cout << consumer_count << std::endl;

    return 0;
}

EDIT:

To avoid the sporadic off-by-one error you could use this:

#include <iostream>
#include <thread>
#include <condition_variable>
#include <queue>
#include <cstdlib>
#include <chrono>
#include <ctime>
#include <random>
#include <atomic>

//counts every number that is added to the queue
static long long producer_count = 0;
//counts every number that is taken out of the queue
static long long consumer_count = 0;

void generateNumbers(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load())
    {
        std::unique_lock<std::mutex> lk(m);
        int rndNum = rand() % 100;
        numbers.push(rndNum);
        producer_count++;
        cv.notify_one(); // Notify worker
        cv.wait(lk); // Wait for worker to complete
     }
}

void work(std::queue<int> & numbers, std::condition_variable & cv, std::mutex & m, std::atomic<bool> & workdone)
{
    while(!workdone.load() or !numbers.empty())
    {
        std::unique_lock<std::mutex> lk(m);
        cv.notify_one(); // Notify generator (placed here to avoid waiting for the lock)
        if (numbers.empty())
            cv.wait(lk); // Wait for the generator to complete
        if (numbers.empty())
            continue;
        std::cout << numbers.front() << std::endl;
        numbers.pop();
        consumer_count++;
     }
}

int main() {
    std::condition_variable cv;
    std::mutex m;
    std::atomic<bool> workdone(false);
    std::queue<int> numbers;

    //start threads
    std::thread producer(generateNumbers, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));
    std::thread consumer(work, std::ref(numbers), std::ref(cv), std::ref(m), std::ref(workdone));


    //wait for 3 seconds, then join the threads
    std::this_thread::sleep_for(std::chrono::seconds(1));
    workdone = true;
    cv.notify_all(); // To prevent dead-lock

    producer.join();
    consumer.join();

    //output the counters
    std::cout << producer_count << std::endl;
    std::cout << consumer_count << std::endl;

    return 0;
}
0
oreubens On

Note that this code may not work properly. the workdone variable is defined as a regular bool and it is perfectly legitimate for the compiler to assume that it can be safely optimized away because it never changes inside the block of code.

if you have a jerk reaction to just add volatile... Nope, that won't work either. You'll need to properly synchronize access to the workdone variable since both threads are reading, and another thread (the ui thread) is writing. An alternate solution would be to use something like an event instead of a simple variable.

But the explanation to your problem. Both threads have the same ending contition (!workdone), but they have a different duration, so there is currently nothing guaranteeing that producer and consumer are somehow synced to run at a similar amount of loops over time.