c++ threading: cv.notify_one() blocks?

920 views Asked by At

I wrote the following structure to implement a simple single producer / multi consumer synchronization. I'm using two integers available_index and consumed_index, access to consumed_index is protected by the condition variable cv. Here's the code:

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <thread>

struct ParserSync {
    std::mutex worker_lock;
    std::condition_variable cv;
    int consumed_index = -1;
    int available_index = -1;
    bool exit_flag = false;

    int consume_index() {
        int ret = -1;

        // get worker_lock
        std::unique_lock<std::mutex> w_lock(worker_lock);

        // wait for exit_flag or new available index
        cv.wait(w_lock, [this] { return exit_flag || available_index > consumed_index; });

        if (available_index > consumed_index) {
            consumed_index++;
            ret = consumed_index;
        }

        // Unlock mutex and notify another thread
        w_lock.unlock();
        cv.notify_one();

        return ret;
    }

    void publish_index() {
        available_index++;
        std::cout << "before" << std::endl;
        cv.notify_one();
        std::cout << "after" << std::endl;
    }

    void set_exit() {
        exit_flag = true;
        cv.notify_all();
    }
};

I tested my implementation using the following code (just a simple example to show the problem):

void producer(ParserSync &ps){
    for (int i=0;i<5000;i++){
        ps.publish_index();
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }
    ps.set_exit();
    std::cout << "Producer finished!" << std::endl;
}

void consumer(ParserSync &ps){
    while (true){
        int idx = ps.consume_index();
        if (idx == -1)
            break;
        std::this_thread::sleep_for(std::chrono::milliseconds(4));
    }
    std::cout << "Consumer finished!" << std::endl;
}

int main() {
    ParserSync ps{};
    const int num_consumers = 4;
    std::vector<std::thread> consumer_threads(num_consumers);

    // start consumers
    for (int i = 0; i < num_consumers; ++i) {
        consumer_threads[i] = std::thread{consumer, std::ref(ps)};
    }
    // start producer
    std::thread producer_thread = std::thread{producer, std::ref(ps)};

    for (int i = 0; i < num_consumers; ++i) {
        consumer_threads[i].join();
    }
    producer_thread.join();
    std::cout << "Program finished" << std::endl;
    return 0;
}

I would expect that producer thread produces 5000 indices and exits afterwards, but unfortunately, it gets stuck at some random iteration. I used print statements to find the code line that blocks and tracked it down to cv.notify_one();. This is the (shortened) console output:

...
before
after
before
after
before

Does anyone know why the call to cv.notify_one(); blocks?

I'm using MinGW (x86_64-6.2.0-posix-seh-rt_v5-rev1) on Windows 10.

Thanks in advance!

EDIT:

When compiling the exact same code with Visual Studio, the program works as expected and doesn't lock itself up. Unfortunately, I need to use MinGW for other reasons.

0

There are 0 answers