Pausing threads for synchronized operation

107 views Asked by At

I have a situation where N threads are working on a data structure simultaneously in small incremental steps. However, sometimes a synchronized action needs to take place.

So all threads need to halt, wait for one of the threads to perform this operation, and continue. I'm looking for a method that has as little as possible impact on the threads when the synchronized action is NOT required.

A simple option is using a shared_mutex, but I think an option with lower overhead is possible. I've attempted my own solution using a barrier and an atomic below.

So my questions are: Is this an effective solution for the problem? Is there a better solution?

#include <vector>
#include <thread>
#include <atomic>
#include <barrier>

int main()
{
    const size_t nr_threads = 10;
    std::vector<std::thread> threads;

    std::barrier barrier { nr_threads };
    std::atomic_bool sync_required { false };

    auto rare_condition = []() { return std::rand() == 42; };

    for (int i = 0; i < nr_threads; ++i)
    {
        threads.emplace_back([&, i]()
        {
            while (true)
            {
                if (sync_required)
                {
                    if (i == 0)
                    {
                        barrier.arrive_and_wait();
                        sync_required = false;
                        // solo synchronized work
                        barrier.arrive_and_wait();
                    }
                    else
                    {
                        barrier.arrive_and_wait();
                        barrier.arrive_and_wait();
                    }
                }

                // standard loop body ...

                // sometimes a global synchronized action is required
                if (rare_condition()) sync_required = true;
            }
        });
    }

    // eventually ... treads quit

    for (auto& thread : threads) 
    {
        thread.join();
    }
}

Another solution with the shared_mutex needs a condition variable?

#include <array>
#include <atomic>
#include <thread>
#include <vector>
#include <barrier>
#include <shared_mutex>

int main()
{
    const size_t nr_threads = 10;
    std::vector<std::thread> threads;

    std::shared_mutex sync_mtx;
    std::atomic_bool sync_required { false };

    auto rare_condition = []() { return std::rand() == 42; };

    for (int i = 0; i < nr_threads; ++i)
    {
        threads.emplace_back([&, i]()
        {
            std::shared_lock shared_lock { sync_mtx };

            while (true)
            {
                // very rarely another thread requires all the others to stop for a bit
                if (sync_required)
                {
                    if (i == 0)
                    {
                        // unlock shared, but lock unique, seems a little odd but neccesary
                        shared_lock.unlock();
                        {
                            std::unique_lock unique_lock{ shared_lock };
                            sync_required = false;
                            // solo sync work
                        }
                        shared_lock.lock();
                    }
                    else
                    {
                        shared_lock.unlock();
                        // todo: need condition variable, which adds more complexity to this solution?
                        shared_lock.lock();
                    }
                }

                // sometimes a global syncronized action is required
                sync_required = sync_required || rare_condition();
            }
        });
    }

    // eventually ... treads quit

    for (auto& thread : threads) 
    {
        thread.join();
    }
}
1

There are 1 answers

0
Vincent Saulue-Laborde On BEST ANSWER

The major optimization I'll propose to your code is to replace the following

// Thread: if (sync_required) {
if (i == 0) { // An arbitrary thread is choosen to run sync().
    // It has to wait until other threads aknowledges having seen sync_required
    barrier.arrive_and_wait();
    sync();

By something like this

// In main:
std::atomic<int> unpaused_count{ nr_threads }; // number of threads that haven't noticed sync_required

// Thread: if (sync_required) {
int old_unpaused_count = unpaused_count.fetch_sub(1); // This thread just noticed sync_required, update the count
if (old_unpaused_count == 1) { // The thread noticing last is choosen to run sync()
    // It can run sync() immediately, since it knows that all other
    // threads have seen sync_required, and have decided to wait.
    sync();
    // TODO: restore unpaused_count & resume other threads.
} else { // This thread isn't the last to notice: wait.

If the targeted platform has a wait-free std::atomic<int>::fetch_sub (hopefully the case with all standard libraries on x64), this code can now choose a thread to run sync() in a wait-free manner, and the chosen thread starts running sync() immediately. This should be much better than a lock/barrier, if I didn't mess that bit of lock-free code (that's a big IF).

My second suggestion is to use std::counting_semaphore to let the sync() thread notify the waiting threads that the sync period is over. The use case is well described on cppreference:

Semaphores are also often used for the semantics of signaling/notifying rather than mutual exclusion, by initializing the semaphore with ​0​ and thus blocking the receiver(s) that try to acquire(), until the notifier "signals" by invoking release(n). In this respect semaphores can be considered alternatives to std::condition_variables, often with better performance.

With the two proposed optimizations, the thread chosen to run sync() never needs to acquire a lock, or wait on some barrier/condition variable. This is highly desirable: the faster it finishes its sync() section, the faster the whole system can restart.

Finally, full code (godbolt):

#include <atomic>
#include <limits>
#include <semaphore>
#include <thread>
#include <vector>

using ThreadCount = int;
static constexpr auto maxThreadCount = std::numeric_limits<ThreadCount>::max();

int main() {
    const ThreadCount nr_threads = 10;
    std::vector<std::thread> threads;

    struct SharedVars {
        std::counting_semaphore<maxThreadCount> end_of_sync{ 0 };
        std::atomic<bool> sync_required{ false };
        std::atomic<ThreadCount> unpaused_count{ nr_threads };
    };
    SharedVars shared;

    auto rare_condition = []() { return std::rand() == 42; };

    for (ThreadCount i = 0; i < nr_threads; ++i) {
        threads.emplace_back([&shared, rare_condition]() {
            while (true) {
                if (shared.sync_required) {
                    ThreadCount old_unpaused_count = shared.unpaused_count.fetch_sub(1);
                    if (old_unpaused_count == 1) {
                        // SYNC section here
                        shared.unpaused_count.store(nr_threads);
                        shared.sync_required.store(false);
                        shared.end_of_sync.release(nr_threads - 1);
                    } else {
                        shared.end_of_sync.acquire();
                    }
                }
                // standard loop body ...

                if (rare_condition()) {
                    shared.sync_required = true;
                }
            }
        });
    }

    for (auto& thread : threads) {
        thread.join();
    }
}

It is certainly possible to use a weaker memory order than SEQ_CST on some of the atomic operations. Reasoning about weaker memory orders is far above my skill, so I'm leaving it this way.