How can I run a detached thread that is waiting for a conditional variable in a loop?

53 views Asked by At

I've got a very time critical simulator that is supposed to run as fast as humanly possible. I tried to increase the data throughput by distributing every data frame to multiple worker threads. To save the overhead of creating new threads every frame, the worker threads are detached, never exit and wait on a conditional variable to signal them their data is ready.
It is important to note that none of the threads access the same data. They all work on their own (fixed) non-overlapping memory addresses.

Every worker thread gets assigned an own thread_data_t struct and the following function:

void worker(thread_data_t* thread_data) {
    
    thread_data->status = THREAD_STATUS_WAITING;

    while (thread_data->status != THREAD_STATUS_CLOSEING) {

        std::unique_lock<std::mutex> lock(thread_data->mutex);

        // thread_data->status set by calling thread, along with .notify_one()
        thread_data->condition.wait(lock, [thread_data]() { return thread_data->status != THREAD_STATUS_WAITING; });
        
        if (thread_data->status == THREAD_STATUS_ASSIGNED) {
            // work and set thread_data->result
        }

        thread_data->status = THREAD_STATUS_WAITING;
    }
}

The simulator function decides when all data for a thread is present and it should start working. This is done by setting thread_data->status = THREAD_STATUS_ASSIGNED; to the thread_data of the thread.

size_t simulate(simulator_t self) {

    size_t sum = 0;

    while (sum < self->done_count) {
    
        // Notify all threads that their data is present and they should start working
        for (size_t i = 0; i < self->num_threads; i++) {

            self->calculation_threads[i].result = 0;

            {
                std::lock_guard<std::mutex> guard(self->calculation_threads[i].mutex);
                self->calculation_threads[i].status = THREAD_STATUS_ASSIGNED;
            }
    
            self->calculation_threads[i].condition.notify_one();
        }

        // All threads should have started working now

        // Wait for all threads to finish
        for (size_t i = 0; i < self->num_threads; i++) {

            while (self->calculation_threads[i].status != THREAD_STATUS_WAITING) { 
                // This is the endless loop occuring
                self->calculation_threads[i].condition.notify_one(); // Notify again in case it was missed the first time

            }

            sum += self->calculation_threads[i].result;
        }

    }
}

Problem

  1. The first time simulate() is called, the function runs correctly. However in the second attempt the simulator gets stuck in the while loop while waiting for all threads to finish. It seems like in the second run the threads don't start working and I don't understand why.
  2. The script works with up to 2 threads, but not with 3 or more. The thread_data of each thread isn't changed outside of the worker or simulate function.

The question is: WHY?

My guess is that I'm doing something wrong with the mutexes that I haven't understood yet.

Any help on this is much appreciated!

0

There are 0 answers